Use a store-release when enqueuing a new call_rcu, and a load-acquire when dequeuing; and read the tail after checking that node->next is consistent, which is the standard message passing pattern and it is clearer than mb_read/mb_set.
Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- util/rcu.c | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/util/rcu.c b/util/rcu.c index e5b6e52be6f8..867607cd5a1e 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -189,8 +189,22 @@ static void enqueue(struct rcu_head *node) struct rcu_head **old_tail; node->next = NULL; + + /* + * Make this node the tail of the list. The node will be + * used by further enqueue operations, but it will not + * be dequeued yet... + */ old_tail = qatomic_xchg(&tail, &node->next); - qatomic_mb_set(old_tail, node); + + /* + * ... until it is pointed to from another item in the list. + * In the meanwhile, try_dequeue() will find a NULL next pointer + * and loop. + * + * Synchronizes with qatomic_load_acquire() in try_dequeue(). + */ + qatomic_store_release(old_tail, node); } static struct rcu_head *try_dequeue(void) @@ -198,25 +212,27 @@ static struct rcu_head *try_dequeue(void) struct rcu_head *node, *next; retry: + /* Head is only written by this thread, so no need for barriers. */ + node = head; + + /* If the head node has NULL in its next pointer, the value is + * wrong and we need to wait until its enqueuer finishes the update. + */ + next = qatomic_load_acquire(&node->next); + if (!next) { + return NULL; + } + /* Test for an empty list, which we do not expect. Note that for * the consumer head and tail are always consistent. The head * is consistent because only the consumer reads/writes it. * The tail, because it is the first step in the enqueuing. * It is only the next pointers that might be inconsistent. */ - if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) { + if (head == &dummy && qatomic_read(&tail) == &dummy.next) { abort(); } - /* If the head node has NULL in its next pointer, the value is - * wrong and we need to wait until its enqueuer finishes the update. - */ - node = head; - next = qatomic_mb_read(&head->next); - if (!next) { - return NULL; - } - /* Since we are the sole consumer, and we excluded the empty case * above, the queue will always have at least two nodes: the * dummy node, and the one being removed. So we do not need to update -- 2.40.0