Am 21.12.2023 um 02:49 hat Stefan Hajnoczi geschrieben: > NBDClient has a number of fields that are accessed by both the export > AioContext and the main loop thread. When the AioContext lock is removed > these fields will need another form of protection. > > Add NBDClient->lock and protect fields that are accessed by both > threads. Also add assertions where possible and otherwise add doc > comments stating assumptions about which thread and lock holding. > > Note this patch moves the client->recv_coroutine assertion from > nbd_co_receive_request() to nbd_trip() where client->lock is held. > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > --- > nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++-------------- > 1 file changed, 94 insertions(+), 34 deletions(-) > > diff --git a/nbd/server.c b/nbd/server.c > index 527fbdab4a..4008ec7df9 100644 > --- a/nbd/server.c > +++ b/nbd/server.c > @@ -125,23 +125,25 @@ struct NBDClient { > int refcount; /* atomic */ > void (*close_fn)(NBDClient *client, bool negotiated); > > + QemuMutex lock; > + > NBDExport *exp; > QCryptoTLSCreds *tlscreds; > char *tlsauthz; > QIOChannelSocket *sioc; /* The underlying data channel */ > QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ > > - Coroutine *recv_coroutine; > + Coroutine *recv_coroutine; /* protected by lock */ > > CoMutex send_lock; > Coroutine *send_coroutine; > > - bool read_yielding; > - bool quiescing; > + bool read_yielding; /* protected by lock */ > + bool quiescing; /* protected by lock */ > > QTAILQ_ENTRY(NBDClient) next; > - int nb_requests; > - bool closing; > + int nb_requests; /* protected by lock */ > + bool closing; /* protected by lock */ > > uint32_t check_align; /* If non-zero, check for aligned client requests > */ > > @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t > size, Error **errp) > > len = qio_channel_readv(client->ioc, &iov, 1, errp); > if (len == QIO_CHANNEL_ERR_BLOCK) { > - client->read_yielding = true; > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + if (client->quiescing) { > + return -EAGAIN; > + }
Why did you add another client->quiescing check here? If it is to address a race, I think you only made the window a bit smaller, but between releasing the lock and yielding the field could still change, so drain needs to handle this case anyway. > + client->read_yielding = true; > + } > qio_channel_yield(client->ioc, G_IO_IN); > - client->read_yielding = false; > - if (client->quiescing) { > - return -EAGAIN; > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + client->read_yielding = false; > + if (client->quiescing) { > + return -EAGAIN; > + } > } > continue; > } else if (len < 0) { > @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client) > blk_exp_unref(&client->exp->common); > } > g_free(client->contexts.bitmaps); > + qemu_mutex_destroy(&client->lock); > g_free(client); > } > } > @@ -1536,11 +1546,13 @@ static void client_close(NBDClient *client, bool > negotiated) > { > assert(qemu_in_main_thread()); > > - if (client->closing) { > - return; > - } > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + if (client->closing) { > + return; > + } > > - client->closing = true; > + client->closing = true; > + } > > /* Force requests to finish. They will drop their own references, > * then we'll close the socket and free the NBDClient. > @@ -1554,6 +1566,7 @@ static void client_close(NBDClient *client, bool > negotiated) > } > } > > +/* Runs in export AioContext with client->lock held */ > static NBDRequestData *nbd_request_get(NBDClient *client) > { > NBDRequestData *req; > @@ -1566,6 +1579,7 @@ static NBDRequestData *nbd_request_get(NBDClient > *client) > return req; > } > > +/* Runs in export AioContext with client->lock held */ > static void nbd_request_put(NBDRequestData *req) > { > NBDClient *client = req->client; > @@ -1589,14 +1603,18 @@ static void blk_aio_attached(AioContext *ctx, void > *opaque) > NBDExport *exp = opaque; > NBDClient *client; > > + assert(qemu_in_main_thread()); > + > trace_nbd_blk_aio_attached(exp->name, ctx); > > exp->common.ctx = ctx; > > QTAILQ_FOREACH(client, &exp->clients, next) { > - assert(client->nb_requests == 0); > - assert(client->recv_coroutine == NULL); > - assert(client->send_coroutine == NULL); > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + assert(client->nb_requests == 0); > + assert(client->recv_coroutine == NULL); > + assert(client->send_coroutine == NULL); > + } > } > } > > @@ -1604,6 +1622,8 @@ static void blk_aio_detach(void *opaque) > { > NBDExport *exp = opaque; > > + assert(qemu_in_main_thread()); > + > trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); > > exp->common.ctx = NULL; > @@ -1614,8 +1634,12 @@ static void nbd_drained_begin(void *opaque) > NBDExport *exp = opaque; > NBDClient *client; > > + assert(qemu_in_main_thread()); > + > QTAILQ_FOREACH(client, &exp->clients, next) { > - client->quiescing = true; > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + client->quiescing = true; > + } > } > } > > @@ -1624,9 +1648,13 @@ static void nbd_drained_end(void *opaque) > NBDExport *exp = opaque; > NBDClient *client; > > + assert(qemu_in_main_thread()); > + > QTAILQ_FOREACH(client, &exp->clients, next) { > - client->quiescing = false; > - nbd_client_receive_next_request(client); > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + client->quiescing = false; > + nbd_client_receive_next_request(client); > + } > } > } > > @@ -1635,17 +1663,21 @@ static bool nbd_drained_poll(void *opaque) > NBDExport *exp = opaque; > NBDClient *client; > > + assert(qemu_in_main_thread()); > + > QTAILQ_FOREACH(client, &exp->clients, next) { > - if (client->nb_requests != 0) { > - /* > - * If there's a coroutine waiting for a request on nbd_read_eof() > - * enter it here so we don't depend on the client to wake it up. > - */ > - if (client->recv_coroutine != NULL && client->read_yielding) { > - qio_channel_wake_read(client->ioc); > + WITH_QEMU_LOCK_GUARD(&client->lock) { > + if (client->nb_requests != 0) { > + /* > + * If there's a coroutine waiting for a request on > nbd_read_eof() > + * enter it here so we don't depend on the client to wake it > up. > + */ > + if (client->recv_coroutine != NULL && client->read_yielding) > { > + qio_channel_wake_read(client->ioc); > + } This is where the race from above becomes relevant. Let's first look at calling qio_channel_wake_read() a tiny bit too early: Without any locking in qio_channel_yield(), we could catch the read coroutine between setting ioc->read_coroutine and before actually yielding. In this case we would call aio_co_wake() on a coroutine that is still running in a different thread. Since it's in a different thread, we only schedule it instead entering it directly, and that just works. The coroutine will immediately be reentered, which is exactly what we want. Even earlier calls of qio_channel_wake_read() (i.e. between setting client->read_yielding and setting ioc->read_coroutine) don't actively hurt, they just don't do anything if no read is in flight. (This is the same case as if the nbd_trip() coroutine didn't even set client->read_yielding yet, just that the check you added above can't catch it.) So if nbd_read_eof() didn't yield yet, we don't wake it here, but we still return true, so the next drained_poll call will try again. This is good in principle, but it depends on waking up the main thread when we made progress. So we have to call aio_wait_kick() between setting ioc->read_coroutine and yielding to make this work. What we actually may get indirectly is an aio_notify() through setting FD handlers if all implementations of qio_channel_set_aio_fd_handler() actually do that. I suppose this could be enough? Anyway, if my result after thinking really hard about this is "I can't rule out that it's correct", maybe it would be better to just run this code in the export AioContext instead so that we don't have to think about all the subtleties and know that the nbd_co_trip() coroutine is at a yield point? > + > + return true; > } > - > - return true; > } > } > > @@ -1656,6 +1688,8 @@ static void nbd_eject_notifier(Notifier *n, void *data) > { > NBDExport *exp = container_of(n, NBDExport, eject_notifier); > > + assert(qemu_in_main_thread()); > + > blk_exp_request_shutdown(&exp->common); > } > > @@ -2541,7 +2575,6 @@ static int coroutine_fn > nbd_co_receive_request(NBDRequestData *req, > int ret; > > g_assert(qemu_in_coroutine()); > - assert(client->recv_coroutine == qemu_coroutine_self()); > ret = nbd_receive_request(client, request, errp); > if (ret < 0) { > return ret; > @@ -2950,7 +2983,11 @@ static coroutine_fn void nbd_trip(void *opaque) > */ > > trace_nbd_trip(); > + > + qemu_mutex_lock(&client->lock); > + > if (client->closing) { > + qemu_mutex_unlock(&client->lock); > aio_co_reschedule_self(qemu_get_aio_context()); > nbd_client_put(client); > return; > @@ -2961,15 +2998,24 @@ static coroutine_fn void nbd_trip(void *opaque) > * We're switching between AIO contexts. Don't attempt to receive a > new > * request and kick the main context which may be waiting for us. > */ > - aio_co_reschedule_self(qemu_get_aio_context()); > - nbd_client_put(client); > client->recv_coroutine = NULL; > + qemu_mutex_unlock(&client->lock); > aio_wait_kick(); > + > + aio_co_reschedule_self(qemu_get_aio_context()); > + nbd_client_put(client); > return; > } > > req = nbd_request_get(client); > - ret = nbd_co_receive_request(req, &request, &local_err); > + > + do { > + assert(client->recv_coroutine == qemu_coroutine_self()); > + qemu_mutex_unlock(&client->lock); > + ret = nbd_co_receive_request(req, &request, &local_err); > + qemu_mutex_lock(&client->lock); > + } while (ret == -EAGAIN && !client->quiescing); I think this deserves a comment to say that the loop is only about the drain case without polling where drained_end has already happened before we reach this point, so we may not terminate the coroutine any more because nothing would restart it. > client->recv_coroutine = NULL; As soon as we're past this, the nbd_client_receive_next_request() called by drained_end will create a new coroutine, so we don't have to be careful about the same case after this. Kevin