Use a store-release when enqueuing a new call_rcu, and a load-acquire
when dequeuing; and read the tail after checking that node->next is
consistent, which is the standard message passing pattern and it is
clearer than mb_read/mb_set.

Signed-off-by: Paolo Bonzini <pbonz...@redhat.com>
---
 util/rcu.c | 38 +++++++++++++++++++++++++++-----------
 1 file changed, 27 insertions(+), 11 deletions(-)

diff --git a/util/rcu.c b/util/rcu.c
index e5b6e52be6f8..867607cd5a1e 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -189,8 +189,22 @@ static void enqueue(struct rcu_head *node)
     struct rcu_head **old_tail;
 
     node->next = NULL;
+
+    /*
+     * Make this node the tail of the list.  The node will be
+     * used by further enqueue operations, but it will not
+     * be dequeued yet...
+     */
     old_tail = qatomic_xchg(&tail, &node->next);
-    qatomic_mb_set(old_tail, node);
+
+    /*
+     * ... until it is pointed to from another item in the list.
+     * In the meanwhile, try_dequeue() will find a NULL next pointer
+     * and loop.
+     *
+     * Synchronizes with qatomic_load_acquire() in try_dequeue().
+     */
+    qatomic_store_release(old_tail, node);
 }
 
 static struct rcu_head *try_dequeue(void)
@@ -198,25 +212,27 @@ static struct rcu_head *try_dequeue(void)
     struct rcu_head *node, *next;
 
 retry:
+    /* Head is only written by this thread, so no need for barriers.  */
+    node = head;
+
+    /* If the head node has NULL in its next pointer, the value is
+     * wrong and we need to wait until its enqueuer finishes the update.
+     */
+    next = qatomic_load_acquire(&node->next);
+    if (!next) {
+        return NULL;
+    }
+
     /* Test for an empty list, which we do not expect.  Note that for
      * the consumer head and tail are always consistent.  The head
      * is consistent because only the consumer reads/writes it.
      * The tail, because it is the first step in the enqueuing.
      * It is only the next pointers that might be inconsistent.
      */
-    if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) {
+    if (head == &dummy && qatomic_read(&tail) == &dummy.next) {
         abort();
     }
 
-    /* If the head node has NULL in its next pointer, the value is
-     * wrong and we need to wait until its enqueuer finishes the update.
-     */
-    node = head;
-    next = qatomic_mb_read(&head->next);
-    if (!next) {
-        return NULL;
-    }
-
     /* Since we are the sole consumer, and we excluded the empty case
      * above, the queue will always have at least two nodes: the
      * dummy node, and the one being removed.  So we do not need to update
-- 
2.40.0


Reply via email to