On Thu, Dec 21, 2023 at 08:23:15AM +0100, Paolo Bonzini wrote:
> On 12/21/23 02:49, Stefan Hajnoczi wrote:
> > The NBD clients list is currently accessed from both the export
> > AioContext and the main loop thread. When the AioContext lock is removed
> > there will be nothing protecting the clients list.
> > 
> > Adding a lock around the clients list is tricky because NBDClient
> > structs are refcounted and may be freed from the export AioContext or
> > the main loop thread. nbd_export_request_shutdown() -> client_close() ->
> > nbd_client_put() is also tricky because the list lock would be held
> > while indirectly dropping references to NDBClients.
> > 
> > A simpler approach is to only allow nbd_client_put() and client_close()
> > calls from the main loop thread. Then the NBD clients list is only
> > accessed from the main loop thread and no fancy locking is needed.
> > 
> > nbd_trip() just needs to reschedule itself in the main loop AioContext
> > before calling nbd_client_put() and client_close(). This costs more CPU
> > cycles per NBD request but is needed for thread-safety when the
> > AioContext lock is removed.
> > 
> > Note that nbd_client_get() can still be called from either thread, so
> > make NBDClient->refcount atomic.
> > 
> > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
> > ---
> >   nbd/server.c | 23 ++++++++++++++++++++---
> >   1 file changed, 20 insertions(+), 3 deletions(-)
> > 
> > diff --git a/nbd/server.c b/nbd/server.c
> > index 0b09ccc8dc..527fbdab4a 100644
> > --- a/nbd/server.c
> > +++ b/nbd/server.c
> > @@ -122,7 +122,7 @@ struct NBDMetaContexts {
> >   };
> >   struct NBDClient {
> > -    int refcount;
> > +    int refcount; /* atomic */
> >       void (*close_fn)(NBDClient *client, bool negotiated);
> >       NBDExport *exp;
> > @@ -1501,14 +1501,17 @@ static int coroutine_fn 
> > nbd_receive_request(NBDClient *client, NBDRequest *reque
> >   #define MAX_NBD_REQUESTS 16
> > +/* Runs in export AioContext and main loop thread */
> >   void nbd_client_get(NBDClient *client)
> >   {
> > -    client->refcount++;
> > +    qatomic_inc(&client->refcount);
> >   }
> >   void nbd_client_put(NBDClient *client)
> >   {
> > -    if (--client->refcount == 0) {
> > +    assert(qemu_in_main_thread());
> > +
> > +    if (qatomic_fetch_dec(&client->refcount) == 1) {
> >           /* The last reference should be dropped by client->close,
> >            * which is called by client_close.
> >            */
> > @@ -1531,6 +1534,8 @@ void nbd_client_put(NBDClient *client)
> >   static void client_close(NBDClient *client, bool negotiated)
> >   {
> > +    assert(qemu_in_main_thread());
> > +
> >       if (client->closing) {
> >           return;
> >       }
> > @@ -2938,8 +2943,15 @@ static coroutine_fn void nbd_trip(void *opaque)
> >       int ret;
> >       Error *local_err = NULL;
> > +    /*
> > +     * Note that nbd_client_put() and client_close() must be called from 
> > the
> > +     * main loop thread. Use aio_co_reschedule_self() to switch AioContext
> > +     * before calling these functions.
> > +     */
> > +
> >       trace_nbd_trip();
> >       if (client->closing) {
> > +        aio_co_reschedule_self(qemu_get_aio_context());
> >           nbd_client_put(client);
> >           return;
> >       }
> > @@ -2949,6 +2961,7 @@ static coroutine_fn void nbd_trip(void *opaque)
> >            * We're switching between AIO contexts. Don't attempt to receive 
> > a new
> >            * request and kick the main context which may be waiting for us.
> >            */
> > +        aio_co_reschedule_self(qemu_get_aio_context());
> >           nbd_client_put(client);
> >           client->recv_coroutine = NULL;
> >           aio_wait_kick();
> > @@ -3013,6 +3026,8 @@ static coroutine_fn void nbd_trip(void *opaque)
> >       qio_channel_set_cork(client->ioc, false);
> >   done:
> >       nbd_request_put(req);
> > +
> > +    aio_co_reschedule_self(qemu_get_aio_context());
> >       nbd_client_put(client);
> >       return;
> 
> This is very expensive to do on every NBD receive, considering that it really
> can happen only when closing (see the assertion in nbd_client_put).
> 
> In Linux there is a common pattern of "if refcount could go to zero, take
> a lock before doing the decrement".  We can do something similar with "if
> refcount could go to zero, move to main iothread before doing the decrement":

Nice suggestion, thanks!

> 
> diff --git a/nbd/server.c b/nbd/server.c
> index 895cf0a7525..aec306923d8 100644
> --- a/nbd/server.c
> +++ b/nbd/server.c
> @@ -1529,6 +1529,21 @@ void nbd_client_put(NBDClient *client)
>      }
>  }
> +static bool nbd_client_put_nonzero(NBDClient *client)
> +{
> +    int old = qatomic_read(&client->refcount);
> +    do {
> +        if (old == 1) {
> +            return false;
> +        }
> +
> +        int expected = old;
> +        old = qatomic_cmpxchg(&client->refcount, expected, expected - 1);
> +    } while (old != expected);
> +
> +    return true;
> +}
> +
>  static void client_close(NBDClient *client, bool negotiated)
>  {
>      if (client->closing) {
> @@ -2936,15 +2951,14 @@ static coroutine_fn int nbd_handle_request(NBDClient 
> *client,
>  static coroutine_fn void nbd_trip(void *opaque)
>  {
>      NBDClient *client = opaque;
> -    NBDRequestData *req;
> +    NBDRequestData *req = NULL;
>      NBDRequest request = { 0 };    /* GCC thinks it can be used 
> uninitialized */
>      int ret;
>      Error *local_err = NULL;
>      trace_nbd_trip();
>      if (client->closing) {
> -        nbd_client_put(client);
> -        return;
> +        goto done;
>      }
>      if (client->quiescing) {
> @@ -2952,10 +2966,9 @@ static coroutine_fn void nbd_trip(void *opaque)
>           * We're switching between AIO contexts. Don't attempt to receive a 
> new
>           * request and kick the main context which may be waiting for us.
>           */
> -        nbd_client_put(client);
>          client->recv_coroutine = NULL;
>          aio_wait_kick();
> -        return;
> +        goto done;
>      }
>      req = nbd_request_get(client);
> @@ -3015,8 +3028,13 @@ static coroutine_fn void nbd_trip(void *opaque)
>      qio_channel_set_cork(client->ioc, false);
>  done:
> -    nbd_request_put(req);
> -    nbd_client_put(client);
> +    if (req) {
> +        nbd_request_put(req);
> +    }
> +    if (!nbd_client_put_nonzero(client)) {
> +        aio_co_reschedule_self(qemu_get_aio_context());
> +        nbd_client_put(client);
> +    }
>      return;
>  disconnect:
> 
> I think adding the "if (req)" should also simplify a little bit the addition 
> of the lock.
> 
> Paolo
> 

Attachment: signature.asc
Description: PGP signature

Reply via email to