The condition for waiting on the s->free_sema queue depends on
both s->in_flight and s->state.  The latter is currently using
atomics, but this is quite dubious and probably wrong.

Because s->state is written in the main thread too, for example by
the reconnect timer callback, it cannot be protected by a CoMutex.
Introduce a separate lock that can be used by nbd_co_send_request();
later on this lock will also be used for s->state.  There will not
be any contention on the lock unless there is a reconnect, so this
is not performance sensitive.

Signed-off-by: Paolo Bonzini <pbonz...@redhat.com>
---
 block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
 1 file changed, 27 insertions(+), 19 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 0ff41cb914..c908ea6ae3 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -72,17 +72,22 @@ typedef struct BDRVNBDState {
     QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
-    CoMutex send_mutex;
+    /*
+     * Protects free_sema, in_flight, requests[].coroutine,
+     * reconnect_delay_timer.
+     */
+    QemuMutex requests_lock;
     CoQueue free_sema;
-
-    CoMutex receive_mutex;
     int in_flight;
+    NBDClientRequest requests[MAX_NBD_REQUESTS];
+    QEMUTimer *reconnect_delay_timer;
+
+    CoMutex send_mutex;
+    CoMutex receive_mutex;
     NBDClientState state;
 
-    QEMUTimer *reconnect_delay_timer;
     QEMUTimer *open_timer;
 
-    NBDClientRequest requests[MAX_NBD_REQUESTS];
     NBDReply reply;
     BlockDriverState *bs;
 
@@ -351,7 +356,7 @@ int coroutine_fn 
nbd_co_do_establish_connection(BlockDriverState *bs,
     return 0;
 }
 
-/* called under s->send_mutex */
+/* Called with s->requests_lock taken.  */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     bool blocking = nbd_client_connecting_wait(s);
@@ -383,9 +388,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState 
*s)
         s->ioc = NULL;
     }
 
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
     nbd_co_do_establish_connection(s->bs, blocking, NULL);
-    qemu_co_mutex_lock(&s->send_mutex);
+    qemu_mutex_lock(&s->requests_lock);
 
     /*
      * The reconnect attempt is done (maybe successfully, maybe not), so
@@ -468,11 +473,10 @@ static int coroutine_fn 
nbd_co_send_request(BlockDriverState *bs,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
 
-    qemu_co_mutex_lock(&s->send_mutex);
-
+    qemu_mutex_lock(&s->requests_lock);
     while (s->in_flight == MAX_NBD_REQUESTS ||
            (!nbd_client_connected(s) && s->in_flight > 0)) {
-        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
+        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
@@ -493,14 +497,14 @@ static int coroutine_fn 
nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
-    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
-
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
     s->requests[i].reply_possible = true;
+    qemu_mutex_unlock(&s->requests_lock);
 
+    qemu_co_mutex_lock(&s->send_mutex);
     request->handle = INDEX_TO_HANDLE(s, i);
 
     assert(s->ioc);
@@ -520,17 +524,19 @@ static int coroutine_fn 
nbd_co_send_request(BlockDriverState *bs,
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
+    qemu_co_mutex_unlock(&s->send_mutex);
 
-err:
     if (rc < 0) {
+        qemu_mutex_lock(&s->requests_lock);
+err:
         nbd_channel_error(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
         }
         s->in_flight--;
         qemu_co_queue_next(&s->free_sema);
+        qemu_mutex_unlock(&s->requests_lock);
     }
-    qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
@@ -1020,12 +1026,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState 
*s,
     return true;
 
 break_loop:
+    qemu_mutex_lock(&s->requests_lock);
     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
-    qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
     qemu_co_queue_next(&s->free_sema);
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
 
     return false;
 }
@@ -1858,8 +1863,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, 
int flags,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
+    qemu_mutex_init(&s->requests_lock);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->send_mutex);
     qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -2046,9 +2052,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
 
     reconnect_delay_timer_del(s);
 
+    qemu_mutex_lock(&s->requests_lock);
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
     }
+    qemu_mutex_unlock(&s->requests_lock);
 
     nbd_co_establish_connection_cancel(s->conn);
 }
-- 
2.35.1



Reply via email to