Bottom halves in AIO context are stored and removes
in LIFO order. It makes their execution non-deterministic.
This patch replaces the stack with queue to preserve the
order of bottom halves processing.

Signed-off-by: Pavel Dovgalyuk <pavel.dovga...@ispras.ru>
---
 async.c              |   26 +++++++++++---------------
 include/block/aio.h  |    4 ++--
 include/qemu/queue.h |   18 ++++++++++++++++++
 3 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/async.c b/async.c
index 2b51e87..bd975c9 100644
--- a/async.c
+++ b/async.c
@@ -35,7 +35,7 @@ struct QEMUBH {
     AioContext *ctx;
     QEMUBHFunc *cb;
     void *opaque;
-    QEMUBH *next;
+    QSIMPLEQ_ENTRY(QEMUBH) next;
     bool scheduled;
     bool idle;
     bool deleted;
@@ -51,10 +51,7 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void 
*opaque)
         .opaque = opaque,
     };
     qemu_mutex_lock(&ctx->bh_lock);
-    bh->next = ctx->first_bh;
-    /* Make sure that the members are ready before putting bh into list */
-    smp_wmb();
-    ctx->first_bh = bh;
+    QSIMPLEQ_INSERT_TAIL_RCU(&ctx->bh_queue, bh, next);
     qemu_mutex_unlock(&ctx->bh_lock);
     return bh;
 }
@@ -62,16 +59,15 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void 
*opaque)
 /* Multiple occurrences of aio_bh_poll cannot be called concurrently */
 int aio_bh_poll(AioContext *ctx)
 {
-    QEMUBH *bh, **bhp, *next;
+    QEMUBH *bh, *next, *prev;
     int ret;
 
     ctx->walking_bh++;
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
+    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
         /* Make sure that fetching bh happens before accessing its members */
         smp_read_barrier_depends();
-        next = bh->next;
         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
          * implicit memory barrier ensures that the callback sees all writes
          * done by the scheduling thread.  It also ensures that the scheduling
@@ -91,14 +87,13 @@ int aio_bh_poll(AioContext *ctx)
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
         qemu_mutex_lock(&ctx->bh_lock);
-        bhp = &ctx->first_bh;
-        while (*bhp) {
-            bh = *bhp;
+        prev = NULL;
+        QSIMPLEQ_FOREACH_SAFE(bh, &ctx->bh_queue, next, next) {
             if (bh->deleted) {
-                *bhp = bh->next;
+                QSIMPLEQ_REMOVE_AFTER(&ctx->bh_queue, prev, QEMUBH, next);
                 g_free(bh);
             } else {
-                bhp = &bh->next;
+                prev = bh;
             }
         }
         qemu_mutex_unlock(&ctx->bh_lock);
@@ -157,7 +152,7 @@ aio_compute_timeout(AioContext *ctx)
     int timeout = -1;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
         if (!bh->deleted && bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -200,7 +195,7 @@ aio_ctx_check(GSource *source)
     AioContext *ctx = (AioContext *) source;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
         if (!bh->deleted && bh->scheduled) {
             return true;
        }
@@ -307,6 +302,7 @@ AioContext *aio_context_new(Error **errp)
     qemu_mutex_init(&ctx->bh_lock);
     rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
+    QSIMPLEQ_INIT(&ctx->bh_queue);
 
     return ctx;
 }
diff --git a/include/block/aio.h b/include/block/aio.h
index 7d1e26b..82cdf78 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -71,8 +71,8 @@ struct AioContext {
     /* lock to protect between bh's adders and deleter */
     QemuMutex bh_lock;
 
-    /* Anchor of the list of Bottom Halves belonging to the context */
-    struct QEMUBH *first_bh;
+    /* List of Bottom Halves belonging to the context */
+    QSIMPLEQ_HEAD(, QEMUBH) bh_queue;
 
     /* A simple lock used to protect the first_bh list, and ensure that
      * no callbacks are removed while we're walking and dispatching callbacks.
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index f781aa2..99564bc 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -271,6 +271,13 @@ struct {                                                   
             \
     (head)->sqh_last = &(elm)->field.sqe_next;                          \
 } while (/*CONSTCOND*/0)
 
+#define QSIMPLEQ_INSERT_TAIL_RCU(head, elm, field) do {                 \
+    (elm)->field.sqe_next = NULL;                                       \
+    smp_wmb();                                                          \
+    atomic_rcu_set((head)->sqh_last, (elm));                            \
+    (head)->sqh_last = &(elm)->field.sqe_next;                          \
+} while (/*CONSTCOND*/0)
+
 #define QSIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do {           \
     if (((elm)->field.sqe_next = (listelm)->field.sqe_next) == NULL)    \
         (head)->sqh_last = &(elm)->field.sqe_next;                      \
@@ -306,6 +313,17 @@ struct {                                                   
             \
     }                                                                   \
 } while (/*CONSTCOND*/0)
 
+#define QSIMPLEQ_REMOVE_AFTER(head, curelm, type, field) do {           \
+    if ((curelm) == NULL) {                                             \
+        QSIMPLEQ_REMOVE_HEAD((head), field);                            \
+    } else {                                                            \
+        if (((curelm)->field.sqe_next =                                 \
+            (curelm)->field.sqe_next->field.sqe_next) == NULL) {        \
+                (head)->sqh_last = &(curelm)->field.sqe_next;           \
+        }                                                               \
+    }                                                                   \
+} while (/*CONSTCOND*/0)
+
 #define QSIMPLEQ_FOREACH(var, head, field)                              \
     for ((var) = ((head)->sqh_first);                                   \
         (var);                                                          \


Reply via email to