This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new fc745cd DISPATCH-2166: Improve message, message_content multithread access correctness fc745cd is described below commit fc745cdf9759d3d2bf0bd1aea752a81e401a3653 Author: Chuck Rolke <c...@apache.org> AuthorDate: Wed Jul 21 16:32:02 2021 -0400 DISPATCH-2166: Improve message, message_content multithread access correctness Many message and message_content variables have implicit thread ownership. * Each message has only one receiver that creates the content. * Each message content may have many senders that consume the content. Some variables are "owned" by the message_receive thread and are never read nor written by the message_send threads. And other variables are owned by the message send threads. Then more variables are shared among all the senders and receiver. This patch addresses the shared variables. * Unsuppress tsan qd_message_set_send_complete * Convert many bools to sys_atomic_t; adjust access methods * Add locking to some variable accesses This closes #1308 --- include/qpid/dispatch/atomic.h | 7 +-- src/message.c | 111 +++++++++++++++++++++++++---------------- src/message_private.h | 79 +++++++++++++++-------------- tests/message_test.c | 12 ++--- tests/tsan.supp | 3 -- 5 files changed, 119 insertions(+), 93 deletions(-) diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h index 9eb09d6..32f4967 100644 --- a/include/qpid/dispatch/atomic.h +++ b/include/qpid/dispatch/atomic.h @@ -205,10 +205,11 @@ static inline void sys_atomic_destroy(sys_atomic_t *ref) #endif -#define SET_ATOMIC_FLAG(flag) sys_atomic_set(flag, 1) -#define CLEAR_ATOMIC_FLAG(flag) sys_atomic_set(flag, 0) +#define SET_ATOMIC_FLAG(flag) sys_atomic_set((flag), 1) +#define CLEAR_ATOMIC_FLAG(flag) sys_atomic_set((flag), 0) +#define SET_ATOMIC_BOOL(flag, value) sys_atomic_set((flag), ((value) ? 1 : 0)) -#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1) +#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1) /** Atomic increase: NOTE returns value *before* increase, like i++ */ static inline uint32_t sys_atomic_inc(sys_atomic_t *ref) { return sys_atomic_add((ref), 1); } diff --git a/src/message.c b/src/message.c index e867913..2d8d579 100644 --- a/src/message.c +++ b/src/message.c @@ -926,8 +926,7 @@ static void qd_message_parse_priority(qd_message_t *in_msg) qd_message_content_t *content = MSG_CONTENT(in_msg); qd_iterator_t *iter = qd_message_field_iterator(in_msg, QD_FIELD_HEADER); - content->priority_parsed = true; - content->priority_present = false; + SET_ATOMIC_FLAG(&content->priority_parsed); if (!!iter) { qd_parsed_field_t *field = qd_parse(iter); @@ -936,8 +935,8 @@ static void qd_message_parse_priority(qd_message_t *in_msg) qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 1); if (qd_parse_tag(priority_field) != QD_AMQP_NULL) { uint32_t value = qd_parse_as_uint(priority_field); - content->priority = value > QDR_MAX_PRIORITY ? QDR_MAX_PRIORITY : (uint8_t) (value & 0x00ff); - content->priority_present = true; + value = MIN(value, QDR_MAX_PRIORITY); + sys_atomic_set(&content->priority, value); } } } @@ -1022,8 +1021,15 @@ qd_message_t *qd_message() ZERO(msg->content); msg->content->lock = sys_mutex(); - sys_atomic_init(&msg->content->ref_count, 1); sys_atomic_init(&msg->content->aborted, 0); + sys_atomic_init(&msg->content->discard, 0); + sys_atomic_init(&msg->content->ma_stream, 0); + sys_atomic_init(&msg->content->no_body, 0); + sys_atomic_init(&msg->content->oversize, 0); + sys_atomic_init(&msg->content->priority, QDR_DEFAULT_PRIORITY); + sys_atomic_init(&msg->content->priority_parsed, 0); + sys_atomic_init(&msg->content->receive_complete, 0); + sys_atomic_init(&msg->content->ref_count, 1); msg->content->parse_depth = QD_DEPTH_NONE; return (qd_message_t*) msg; } @@ -1040,6 +1046,8 @@ void qd_message_free(qd_message_t *in_msg) qd_buffer_list_free_buffers(&msg->ma_trace); qd_buffer_list_free_buffers(&msg->ma_ingress); + sys_atomic_destroy(&msg->send_complete); + qd_message_content_t *content = msg->content; if (msg->is_fanout) { @@ -1104,6 +1112,14 @@ void qd_message_free(qd_message_t *in_msg) sys_mutex_free(content->lock); sys_atomic_destroy(&content->aborted); + sys_atomic_destroy(&content->discard); + sys_atomic_destroy(&content->ma_stream); + sys_atomic_destroy(&content->no_body); + sys_atomic_destroy(&content->oversize); + sys_atomic_destroy(&content->priority); + sys_atomic_destroy(&content->priority_parsed); + sys_atomic_destroy(&content->receive_complete); + sys_atomic_destroy(&content->ref_count); free_qd_message_content_t(content); } @@ -1133,7 +1149,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) copy->sent_depth = QD_DEPTH_NONE; copy->cursor.buffer = 0; copy->cursor.cursor = 0; - copy->send_complete = false; + sys_atomic_init(©->send_complete, 0); copy->tag_sent = false; copy->is_fanout = false; @@ -1186,7 +1202,7 @@ void qd_message_message_annotations(qd_message_t *in_msg) } if (ma_pf_stream) { - content->ma_stream = qd_parse_as_int(ma_pf_stream); + SET_ATOMIC_BOOL(&content->ma_stream, qd_parse_as_int(ma_pf_stream)); qd_parse_free(ma_pf_stream); } @@ -1225,7 +1241,7 @@ int qd_message_get_phase_annotation(const qd_message_t *in_msg) void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream) { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - msg->content->ma_stream = stream; + SET_ATOMIC_BOOL(&msg->content->ma_stream, stream); } void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field) @@ -1241,7 +1257,7 @@ bool qd_message_is_discard(qd_message_t *msg) if (!msg) return false; qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg; - return pvt_msg->content->discard; + return IS_ATOMIC_FLAG_SET(&pvt_msg->content->discard); } void qd_message_set_discard(qd_message_t *msg, bool discard) @@ -1250,7 +1266,7 @@ void qd_message_set_discard(qd_message_t *msg, bool discard) return; qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg; - pvt_msg->content->discard = discard; + SET_ATOMIC_BOOL(&pvt_msg->content->discard, discard); } @@ -1304,10 +1320,10 @@ uint8_t qd_message_get_priority(qd_message_t *msg) { qd_message_content_t *content = MSG_CONTENT(msg); - if (!content->priority_parsed) + if (!IS_ATOMIC_FLAG_SET(&content->priority_parsed)) qd_message_parse_priority(msg); - return content->priority_present ? content->priority : QDR_DEFAULT_PRIORITY; + return sys_atomic_get(&content->priority); } bool qd_message_receive_complete(qd_message_t *in_msg) @@ -1315,7 +1331,7 @@ bool qd_message_receive_complete(qd_message_t *in_msg) if (!in_msg) return false; qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - return msg->content->receive_complete; + return IS_ATOMIC_FLAG_SET(&msg->content->receive_complete); } @@ -1325,7 +1341,7 @@ bool qd_message_send_complete(qd_message_t *in_msg) return false; qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - return msg->send_complete; + return IS_ATOMIC_FLAG_SET(&msg->send_complete); } @@ -1333,7 +1349,7 @@ void qd_message_set_send_complete(qd_message_t *in_msg) { if (!!in_msg) { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - msg->send_complete = true; + SET_ATOMIC_FLAG(&msg->send_complete); } } @@ -1346,7 +1362,7 @@ void qd_message_set_receive_complete(qd_message_t *in_msg) LOCK(content->lock); - content->receive_complete = true; + SET_ATOMIC_FLAG(&content->receive_complete); if (content->q2_input_holdoff) { content->q2_input_holdoff = false; q2_unblock = content->q2_unblocker; @@ -1365,7 +1381,7 @@ void qd_message_set_no_body(qd_message_t *in_msg) { if (!!in_msg) { qd_message_content_t *content = MSG_CONTENT(in_msg); - content->no_body = true; + SET_ATOMIC_FLAG(&content->no_body); } } @@ -1373,7 +1389,7 @@ bool qd_message_no_body(qd_message_t *in_msg) { if (!!in_msg) { qd_message_content_t *content = MSG_CONTENT(in_msg); - return content->no_body; + return IS_ATOMIC_FLAG_SET(&content->no_body); } return false; @@ -1425,7 +1441,7 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, } pn_record_t *record = pn_delivery_attachments(delivery); pn_record_set(record, PN_DELIVERY_CTX, 0); - if (msg->content->oversize) { + if (IS_ATOMIC_FLAG_SET(&msg->content->oversize)) { // Aborting the content disposes of downstream copies. // This has no effect on the received message. SET_ATOMIC_FLAG(&msg->content->aborted); @@ -1499,7 +1515,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // but not process the message for delivery. // Oversize messages are also discarded. // - if (msg->content->discard) { + if (IS_ATOMIC_FLAG_SET(&msg->content->discard)) { return discard_receive(delivery, link, (qd_message_t *)msg); } @@ -1509,11 +1525,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // have been processed and freed by outbound processing then // message holdoff is cleared and receiving may continue. // + LOCK(msg->content->lock); if (!qd_link_is_q2_limit_unbounded(qdl) && !msg->content->disable_q2_holdoff) { if (msg->content->q2_input_holdoff) { + UNLOCK(msg->content->lock); return (qd_message_t*)msg; } } + UNLOCK(msg->content->lock); // Loop until msg is complete, error seen, or incoming bytes are consumed qd_message_content_t *content = msg->content; @@ -1616,8 +1635,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) qd_connection_t *conn = qd_link_connection(qdl); qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer maxMessageSize exceeded"); qd_policy_count_max_size_event(link, conn); - content->discard = true; - content->oversize = true; + SET_ATOMIC_FLAG(&content->discard); + SET_ATOMIC_FLAG(&content->oversize); return discard_receive(delivery, link, (qd_message_t*)msg); } } @@ -1690,7 +1709,7 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list !DEQ_IS_EMPTY(msg->ma_trace) || !DEQ_IS_EMPTY(msg->ma_ingress) || msg->ma_phase != 0 || - msg->content->ma_stream) { + IS_ATOMIC_FLAG_SET(&msg->content->ma_stream)) { if (!map_started) { qd_compose_start_map(out_ma); @@ -1721,9 +1740,9 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list field_count++; } - if (msg->content->ma_stream) { + if (IS_ATOMIC_FLAG_SET(&msg->content->ma_stream)) { qd_compose_insert_symbol(field, QD_MA_STREAM); - qd_compose_insert_int(field, msg->content->ma_stream); + qd_compose_insert_int(field, 1); field_count++; } // pad out to N fields @@ -1799,7 +1818,7 @@ void qd_message_send(qd_message_t *in_msg, if (IS_ATOMIC_FLAG_SET(&content->aborted)) { // Message is aborted before any part of it has been sent. // Declare the message to be sent, - msg->send_complete = true; + SET_ATOMIC_FLAG(&msg->send_complete); // If the outgoing delivery is not already aborted then abort it. if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); @@ -1926,7 +1945,7 @@ void qd_message_send(qd_message_t *in_msg, // get a link detach event for this link // SET_ATOMIC_FLAG(&content->aborted); - msg->send_complete = true; + SET_ATOMIC_FLAG(&msg->send_complete); if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); } @@ -1979,7 +1998,7 @@ void qd_message_send(qd_message_t *in_msg, msg->cursor.buffer = next_buf; msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) : 0; - msg->send_complete = (complete && !next_buf); + SET_ATOMIC_BOOL(&msg->send_complete, (complete && !next_buf)); } buf = next_buf; @@ -2004,7 +2023,7 @@ void qd_message_send(qd_message_t *in_msg, if (IS_ATOMIC_FLAG_SET(&content->aborted)) { if (pn_link_current(pnl)) { - msg->send_complete = true; + SET_ATOMIC_FLAG(&msg->send_complete); if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); } @@ -2040,7 +2059,7 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co } if (rc == QD_SECTION_NEED_MORE) { - if (!content->receive_complete) + if (!IS_ATOMIC_FLAG_SET(&content->receive_complete)) return QD_MESSAGE_DEPTH_INCOMPLETE; // no more data is going to come. OK if at the end and optional: @@ -2066,7 +2085,7 @@ static qd_message_depth_status_t qd_message_check_LH(qd_message_content_t *conte qd_buffer_t *buffer = DEQ_HEAD(content->buffers); if (!buffer) { - return content->receive_complete ? QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE; + return IS_ATOMIC_FLAG_SET(&content->receive_complete) ? QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE; } if (content->parse_buffer == 0) { @@ -2295,7 +2314,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b { qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); qd_message_content_t *content = MSG_CONTENT(msg); - content->receive_complete = true; + SET_ATOMIC_FLAG(&content->receive_complete); qd_compose_start_list(field); qd_compose_insert_bool(field, 0); // durable @@ -2347,7 +2366,7 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field, bool co qd_buffer_list_t *field_buffers = qd_compose_buffers(field); content->buffers = *field_buffers; - content->receive_complete = complete; + SET_ATOMIC_BOOL(&content->receive_complete, complete); DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. } @@ -2356,7 +2375,7 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field, bool co void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, bool receive_complete) { qd_message_content_t *content = MSG_CONTENT(msg); - content->receive_complete = receive_complete; + SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); @@ -2369,7 +2388,7 @@ void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_com void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, bool receive_complete) { qd_message_content_t *content = MSG_CONTENT(msg); - content->receive_complete = receive_complete; + SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3); @@ -2383,7 +2402,7 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete) { qd_message_content_t *content = MSG_CONTENT(msg); - content->receive_complete = receive_complete; + SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3); @@ -2803,10 +2822,8 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg break; case QD_SECTION_NEED_MORE: - if (msg->content->receive_complete) - result = QD_MESSAGE_STREAM_DATA_NO_MORE; - else - result = QD_MESSAGE_STREAM_DATA_INCOMPLETE; + result = IS_ATOMIC_FLAG_SET(&msg->content->receive_complete) ? + QD_MESSAGE_STREAM_DATA_NO_MORE : QD_MESSAGE_STREAM_DATA_INCOMPLETE; break; } @@ -2901,7 +2918,8 @@ int qd_message_get_phase_val(qd_message_t *msg) int qd_message_is_streaming(qd_message_t *msg) { - return ((qd_message_pvt_t*) msg)->content->ma_stream; + qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg; + return IS_ATOMIC_FLAG_SET(&msg_pvt->content->ma_stream); } @@ -2946,7 +2964,14 @@ bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content) bool qd_message_is_Q2_blocked(const qd_message_t *msg) { - return ((const qd_message_pvt_t*)msg)->content->q2_input_holdoff; + qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg; + qd_message_content_t *content = msg_pvt->content; + + bool blocked; + LOCK(content->lock); + blocked = content->q2_input_holdoff; + UNLOCK(content->lock); + return blocked; } @@ -2969,7 +2994,7 @@ void qd_message_set_aborted(const qd_message_t *msg) bool qd_message_oversize(const qd_message_t *msg) { qd_message_content_t * mc = MSG_CONTENT(msg); - return mc->oversize; + return IS_ATOMIC_FLAG_SET(&mc->oversize); } diff --git a/src/message_private.h b/src/message_private.h index cdb0e28..e7217e8 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -115,57 +115,60 @@ typedef struct { qd_field_location_t field_group_sequence; qd_field_location_t field_reply_to_group_id; - qd_buffer_t *parse_buffer; // Pointer to the buffer where parsing should resume, if needed - unsigned char *parse_cursor; // Pointer to octet in parse_buffer where parsing should resume, if needed - qd_message_depth_t parse_depth; // The depth to which this message content has been parsed - qd_iterator_t *ma_field_iter_in; // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION + qd_buffer_t *parse_buffer; // Buffer where parsing should resume + unsigned char *parse_cursor; // Octet in parse_buffer where parsing should resume + qd_message_depth_t parse_depth; // Depth to which message content has been parsed + qd_iterator_t *ma_field_iter_in; // Iter for msg.FIELD_MESSAGE_ANNOTATION qd_iterator_pointer_t ma_user_annotation_blob; // Original user annotations - // with router annotations stripped + // with router annotations stripped uint32_t ma_count; // Number of map elements in blob - // after the router fields stripped + // after router fields stripped qd_parsed_field_t *ma_pf_ingress; qd_parsed_field_t *ma_pf_phase; qd_parsed_field_t *ma_pf_to_override; qd_parsed_field_t *ma_pf_trace; int ma_int_phase; - bool ma_stream; // indicates whether this message is streaming - uint64_t max_message_size; // configured max; 0 if no max to enforce - uint64_t bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size - size_t protected_buffers; // count of permanent buffers that hold the message headers - uint32_t fanout; // The number of receivers for this message, including in-process subscribers. - - qd_message_q2_unblocker_t q2_unblocker; // callback and context to signal Q2 unblocked to receiver - - bool ma_parsed; // have parsed annotations in incoming message - bool discard; // Should this message be discarded? - bool receive_complete; // true if the message has been completely received, false otherwise - bool q2_input_holdoff; // hold off calling pn_link_recv - bool disable_q2_holdoff; // Disable the Q2 flow control - bool priority_parsed; - bool priority_present; - bool oversize; // policy oversize handling in effect - bool no_body; // Used for http2 messages. If no_body is true, the HTTP request had no body - uint8_t priority; // The priority of this message - sys_atomic_t aborted; + sys_atomic_t ma_stream; // Message is streaming + uint64_t max_message_size; // Configured max; 0 if no max to enforce + uint64_t bytes_received; // Bytes returned by pn_link_recv() + // when enforcing max_message_size + size_t protected_buffers; // Count of permanent buffers that hold message headers + uint32_t fanout; // Number of receivers for this message + // including in-process subscribers. + + qd_message_q2_unblocker_t q2_unblocker; // Callback and context to signal Q2 unblocked to receiver + + bool ma_parsed; // Have parsed incoming message annotations message + sys_atomic_t discard; // Message is being discarded + sys_atomic_t receive_complete; // Message has been completely received + bool q2_input_holdoff; // Q2 state: hold off calling pn_link_recv + bool disable_q2_holdoff; // Disable Q2 flow control + sys_atomic_t priority_parsed; // Message priority has been parsed + sys_atomic_t oversize; // Policy oversize-message handling in effect + sys_atomic_t no_body; // HTTP2 request has no body + sys_atomic_t priority; // Message AMQP priority + sys_atomic_t aborted; // Message has been aborted } qd_message_content_t; struct qd_message_pvt_t { - qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream. - qd_message_depth_t message_depth; // What is the depth of the message that has been received so far - qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on. - qd_message_content_t *content; // The actual content of the message. The content is never copied - qd_buffer_list_t ma_to_override; // to field in outgoing message annotations. - qd_buffer_list_t ma_trace; // trace list in outgoing message annotations - qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations - int ma_phase; // phase for the override address - qd_message_stream_data_list_t stream_data_list; // TODO - move this to the content for one-time parsing (TLR) - unsigned char *body_cursor; // tracks the point in the content buffer chain - qd_buffer_t *body_buffer; // to parse the next body data section (if it exists) + qd_iterator_pointer_t cursor; // Pointer to current location of outgoing byte stream. + qd_message_depth_t message_depth; // Depth of incoming received message + qd_message_depth_t sent_depth; // Depth of outgoing sent message + qd_message_content_t *content; // Singleton content shared by reference between + // incoming and all outgoing copies + qd_buffer_list_t ma_to_override; // To field in outgoing message annotations. + qd_buffer_list_t ma_trace; // Trace list in outgoing message annotations + qd_buffer_list_t ma_ingress; // Ingress field in outgoing message annotations + int ma_phase; // Phase for override address + qd_message_stream_data_list_t stream_data_list;// Stream data parse structure + // TODO - move this to the content for one-time parsing (TLR) + unsigned char *body_cursor; // Stream: tracks the point in the content buffer chain + qd_buffer_t *body_buffer; // Stream: to parse the next body data section, if any bool strip_annotations_in; - bool send_complete; // Has the message been completely received and completely sent? + sys_atomic_t send_complete; // Message has been been completely sent bool tag_sent; // Tags are sent - bool is_fanout; // If msg is an outgoing fanout + bool is_fanout; // Message is an outgoing fanout }; ALLOC_DECLARE(qd_message_t); diff --git a/tests/message_test.c b/tests/message_test.c index 5e146da..242f819 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -63,7 +63,7 @@ static void set_content(qd_message_content_t *content, unsigned char *buffer, si qd_buffer_insert(buf, segment); DEQ_INSERT_TAIL(content->buffers, buf); } - content->receive_complete = true; + SET_ATOMIC_FLAG(&content->receive_complete); } @@ -593,7 +593,7 @@ static char *test_incomplete_annotations(void *context) msg = qd_message(); qd_message_content_t *content = MSG_CONTENT(msg); set_content(content, buffer, 100); - content->receive_complete = false; // more data coming! + CLEAR_ATOMIC_FLAG(&content->receive_complete); // more data coming! if (qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) != QD_MESSAGE_DEPTH_INCOMPLETE) { result = "Error: incomplete message was not detected!"; goto exit; @@ -626,7 +626,7 @@ static char *test_check_weird_messages(void *context) 0xc1, 0x01, 0x00}; // first test an incomplete pattern: set_content(MSG_CONTENT(msg), da_map, 4); - MSG_CONTENT(msg)->receive_complete = false; + CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete)); qd_message_depth_status_t mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) { result = "Expected INCOMPLETE status"; @@ -635,7 +635,7 @@ static char *test_check_weird_messages(void *context) // full pattern, but no tag set_content(MSG_CONTENT(msg), &da_map[4], 6); - MSG_CONTENT(msg)->receive_complete = false; + CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete)); mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) { result = "Expected INCOMPLETE status"; @@ -644,7 +644,7 @@ static char *test_check_weird_messages(void *context) // add tag, but incomplete field: set_content(MSG_CONTENT(msg), &da_map[10], 1); - MSG_CONTENT(msg)->receive_complete = false; + CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete)); mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) { result = "Expected INCOMPLETE status"; @@ -664,7 +664,7 @@ static char *test_check_weird_messages(void *context) qd_message_free(msg); msg = qd_message(); set_content(MSG_CONTENT(msg), bad_hdr, sizeof(bad_hdr)); - MSG_CONTENT(msg)->receive_complete = false; + CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete)); mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); // looking _past_ header! if (mc != QD_MESSAGE_DEPTH_INVALID) { result = "Bad tag not detected!"; diff --git a/tests/tsan.supp b/tests/tsan.supp index 4312eae..9196c62 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -79,9 +79,6 @@ race:qdr_delivery_mcast_outbound_disposition_CT # DISPATCH-2157 race:^qd_message_send$ -# DISPATCH-2166 -race:qd_message_set_send_complete - # # External libraries # --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org