On Thu, Dec 21, 2023 at 11:45:36AM +0100, Kevin Wolf wrote: > Am 21.12.2023 um 02:49 hat Stefan Hajnoczi geschrieben: > > NBDClient has a number of fields that are accessed by both the export > > AioContext and the main loop thread. When the AioContext lock is removed > > these fields will need another form of protection. > > > > Add NBDClient->lock and protect fields that are accessed by both > > threads. Also add assertions where possible and otherwise add doc > > comments stating assumptions about which thread and lock holding. > > > > Note this patch moves the client->recv_coroutine assertion from > > nbd_co_receive_request() to nbd_trip() where client->lock is held. > > > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > > --- > > nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++-------------- > > 1 file changed, 94 insertions(+), 34 deletions(-) > > > > diff --git a/nbd/server.c b/nbd/server.c > > index 527fbdab4a..4008ec7df9 100644 > > --- a/nbd/server.c > > +++ b/nbd/server.c > > @@ -125,23 +125,25 @@ struct NBDClient { > > int refcount; /* atomic */ > > void (*close_fn)(NBDClient *client, bool negotiated); > > > > + QemuMutex lock; > > + > > NBDExport *exp; > > QCryptoTLSCreds *tlscreds; > > char *tlsauthz; > > QIOChannelSocket *sioc; /* The underlying data channel */ > > QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) > > */ > > > > - Coroutine *recv_coroutine; > > + Coroutine *recv_coroutine; /* protected by lock */ > > > > CoMutex send_lock; > > Coroutine *send_coroutine; > > > > - bool read_yielding; > > - bool quiescing; > > + bool read_yielding; /* protected by lock */ > > + bool quiescing; /* protected by lock */ > > > > QTAILQ_ENTRY(NBDClient) next; > > - int nb_requests; > > - bool closing; > > + int nb_requests; /* protected by lock */ > > + bool closing; /* protected by lock */ > > > > uint32_t check_align; /* If non-zero, check for aligned client > > requests */ > > > > @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, > > size_t size, Error **errp) > > > > len = qio_channel_readv(client->ioc, &iov, 1, errp); > > if (len == QIO_CHANNEL_ERR_BLOCK) { > > - client->read_yielding = true; > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + if (client->quiescing) { > > + return -EAGAIN; > > + } > > Why did you add another client->quiescing check here? > > If it is to address a race, I think you only made the window a bit > smaller, but between releasing the lock and yielding the field could > still change, so drain needs to handle this case anyway.
I added it for consistency/symmetry where nbd_trip() checks client->quiescing after acquiring client->lock but didn't have any specific scenario in mind. I'll drop this. I agree that it does not prevent races. .drained_begin() + .drained_poll() can run after client->lock is released and before qio_channel_yield() takes effect. In that case we miss client->quiescing and still have the race where no wake occurs because qio_channel_wake_read() sees ioc->read_coroutine == NULL. > > + client->read_yielding = true; > > + } > > qio_channel_yield(client->ioc, G_IO_IN); > > - client->read_yielding = false; > > - if (client->quiescing) { > > - return -EAGAIN; > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + client->read_yielding = false; > > + if (client->quiescing) { > > + return -EAGAIN; > > + } > > } > > continue; > > } else if (len < 0) { > > @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client) > > blk_exp_unref(&client->exp->common); > > } > > g_free(client->contexts.bitmaps); > > + qemu_mutex_destroy(&client->lock); > > g_free(client); > > } > > } > > @@ -1536,11 +1546,13 @@ static void client_close(NBDClient *client, bool > > negotiated) > > { > > assert(qemu_in_main_thread()); > > > > - if (client->closing) { > > - return; > > - } > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + if (client->closing) { > > + return; > > + } > > > > - client->closing = true; > > + client->closing = true; > > + } > > > > /* Force requests to finish. They will drop their own references, > > * then we'll close the socket and free the NBDClient. > > @@ -1554,6 +1566,7 @@ static void client_close(NBDClient *client, bool > > negotiated) > > } > > } > > > > +/* Runs in export AioContext with client->lock held */ > > static NBDRequestData *nbd_request_get(NBDClient *client) > > { > > NBDRequestData *req; > > @@ -1566,6 +1579,7 @@ static NBDRequestData *nbd_request_get(NBDClient > > *client) > > return req; > > } > > > > +/* Runs in export AioContext with client->lock held */ > > static void nbd_request_put(NBDRequestData *req) > > { > > NBDClient *client = req->client; > > @@ -1589,14 +1603,18 @@ static void blk_aio_attached(AioContext *ctx, void > > *opaque) > > NBDExport *exp = opaque; > > NBDClient *client; > > > > + assert(qemu_in_main_thread()); > > + > > trace_nbd_blk_aio_attached(exp->name, ctx); > > > > exp->common.ctx = ctx; > > > > QTAILQ_FOREACH(client, &exp->clients, next) { > > - assert(client->nb_requests == 0); > > - assert(client->recv_coroutine == NULL); > > - assert(client->send_coroutine == NULL); > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + assert(client->nb_requests == 0); > > + assert(client->recv_coroutine == NULL); > > + assert(client->send_coroutine == NULL); > > + } > > } > > } > > > > @@ -1604,6 +1622,8 @@ static void blk_aio_detach(void *opaque) > > { > > NBDExport *exp = opaque; > > > > + assert(qemu_in_main_thread()); > > + > > trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); > > > > exp->common.ctx = NULL; > > @@ -1614,8 +1634,12 @@ static void nbd_drained_begin(void *opaque) > > NBDExport *exp = opaque; > > NBDClient *client; > > > > + assert(qemu_in_main_thread()); > > + > > QTAILQ_FOREACH(client, &exp->clients, next) { > > - client->quiescing = true; > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + client->quiescing = true; > > + } > > } > > } > > > > @@ -1624,9 +1648,13 @@ static void nbd_drained_end(void *opaque) > > NBDExport *exp = opaque; > > NBDClient *client; > > > > + assert(qemu_in_main_thread()); > > + > > QTAILQ_FOREACH(client, &exp->clients, next) { > > - client->quiescing = false; > > - nbd_client_receive_next_request(client); > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + client->quiescing = false; > > + nbd_client_receive_next_request(client); > > + } > > } > > } > > > > @@ -1635,17 +1663,21 @@ static bool nbd_drained_poll(void *opaque) > > NBDExport *exp = opaque; > > NBDClient *client; > > > > + assert(qemu_in_main_thread()); > > + > > QTAILQ_FOREACH(client, &exp->clients, next) { > > - if (client->nb_requests != 0) { > > - /* > > - * If there's a coroutine waiting for a request on > > nbd_read_eof() > > - * enter it here so we don't depend on the client to wake it > > up. > > - */ > > - if (client->recv_coroutine != NULL && client->read_yielding) { > > - qio_channel_wake_read(client->ioc); > > + WITH_QEMU_LOCK_GUARD(&client->lock) { > > + if (client->nb_requests != 0) { > > + /* > > + * If there's a coroutine waiting for a request on > > nbd_read_eof() > > + * enter it here so we don't depend on the client to wake > > it up. > > + */ > > + if (client->recv_coroutine != NULL && > > client->read_yielding) { > > + qio_channel_wake_read(client->ioc); > > + } > > This is where the race from above becomes relevant. > > Let's first look at calling qio_channel_wake_read() a tiny bit too > early: Without any locking in qio_channel_yield(), we could catch the > read coroutine between setting ioc->read_coroutine and before actually > yielding. In this case we would call aio_co_wake() on a coroutine that > is still running in a different thread. Since it's in a different > thread, we only schedule it instead entering it directly, and that just > works. The coroutine will immediately be reentered, which is exactly > what we want. > > Even earlier calls of qio_channel_wake_read() (i.e. between setting > client->read_yielding and setting ioc->read_coroutine) don't actively > hurt, they just don't do anything if no read is in flight. (This is the > same case as if the nbd_trip() coroutine didn't even set > client->read_yielding yet, just that the check you added above can't > catch it.) > > So if nbd_read_eof() didn't yield yet, we don't wake it here, but we > still return true, so the next drained_poll call will try again. > > This is good in principle, but it depends on waking up the main thread > when we made progress. So we have to call aio_wait_kick() between > setting ioc->read_coroutine and yielding to make this work. What we > actually may get indirectly is an aio_notify() through setting FD > handlers if all implementations of qio_channel_set_aio_fd_handler() > actually do that. I suppose this could be enough? qio_channel_set_aio_fd_handler() calls aio_notify() on the export's AioContext. It does not wake the main loop AioContext when an IOThread is being used so I don't think it helps here. The state where qio_channel_wake_read() misses that nbd_trip() is yielding looks like this: client->nb_requests > 0 client->recv_coroutine = nbd_trip() coroutine client->quiescing = true client->read_yielding = true ioc->read_coroutine = NULL The main loop thread is waiting for activity and nbd_trip() enters qemu_coroutine_yield(). There is no progress until the main loop thread resumes (which can be triggered by the export AioContext completing NBD I/O too). I guess the race isn't immediately apparent because there is usually some event loop activity that hides the problem. > Anyway, if my result after thinking really hard about this is "I can't > rule out that it's correct", maybe it would be better to just run this > code in the export AioContext instead so that we don't have to think > about all the subtleties and know that the nbd_co_trip() coroutine is at > a yield point? Agreed. > > + > > + return true; > > } > > - > > - return true; > > } > > } > > > > @@ -1656,6 +1688,8 @@ static void nbd_eject_notifier(Notifier *n, void > > *data) > > { > > NBDExport *exp = container_of(n, NBDExport, eject_notifier); > > > > + assert(qemu_in_main_thread()); > > + > > blk_exp_request_shutdown(&exp->common); > > } > > > > @@ -2541,7 +2575,6 @@ static int coroutine_fn > > nbd_co_receive_request(NBDRequestData *req, > > int ret; > > > > g_assert(qemu_in_coroutine()); > > - assert(client->recv_coroutine == qemu_coroutine_self()); > > ret = nbd_receive_request(client, request, errp); > > if (ret < 0) { > > return ret; > > @@ -2950,7 +2983,11 @@ static coroutine_fn void nbd_trip(void *opaque) > > */ > > > > trace_nbd_trip(); > > + > > + qemu_mutex_lock(&client->lock); > > + > > if (client->closing) { > > + qemu_mutex_unlock(&client->lock); > > aio_co_reschedule_self(qemu_get_aio_context()); > > nbd_client_put(client); > > return; > > @@ -2961,15 +2998,24 @@ static coroutine_fn void nbd_trip(void *opaque) > > * We're switching between AIO contexts. Don't attempt to receive > > a new > > * request and kick the main context which may be waiting for us. > > */ > > - aio_co_reschedule_self(qemu_get_aio_context()); > > - nbd_client_put(client); > > client->recv_coroutine = NULL; > > + qemu_mutex_unlock(&client->lock); > > aio_wait_kick(); > > + > > + aio_co_reschedule_self(qemu_get_aio_context()); > > + nbd_client_put(client); > > return; > > } > > > > req = nbd_request_get(client); > > - ret = nbd_co_receive_request(req, &request, &local_err); > > + > > + do { > > + assert(client->recv_coroutine == qemu_coroutine_self()); > > + qemu_mutex_unlock(&client->lock); > > + ret = nbd_co_receive_request(req, &request, &local_err); > > + qemu_mutex_lock(&client->lock); > > + } while (ret == -EAGAIN && !client->quiescing); > > I think this deserves a comment to say that the loop is only about the > drain case without polling where drained_end has already happened before > we reach this point, so we may not terminate the coroutine any more > because nothing would restart it. Sounds good, I'll add a comment in the next revision. > > client->recv_coroutine = NULL; > > As soon as we're past this, the nbd_client_receive_next_request() called > by drained_end will create a new coroutine, so we don't have to be > careful about the same case after this. > > Kevin >
signature.asc
Description: PGP signature