Am 21.12.2023 um 02:49 hat Stefan Hajnoczi geschrieben:
> NBDClient has a number of fields that are accessed by both the export
> AioContext and the main loop thread. When the AioContext lock is removed
> these fields will need another form of protection.
> 
> Add NBDClient->lock and protect fields that are accessed by both
> threads. Also add assertions where possible and otherwise add doc
> comments stating assumptions about which thread and lock holding.
> 
> Note this patch moves the client->recv_coroutine assertion from
> nbd_co_receive_request() to nbd_trip() where client->lock is held.
> 
> Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
> ---
>  nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 94 insertions(+), 34 deletions(-)
> 
> diff --git a/nbd/server.c b/nbd/server.c
> index 527fbdab4a..4008ec7df9 100644
> --- a/nbd/server.c
> +++ b/nbd/server.c
> @@ -125,23 +125,25 @@ struct NBDClient {
>      int refcount; /* atomic */
>      void (*close_fn)(NBDClient *client, bool negotiated);
>  
> +    QemuMutex lock;
> +
>      NBDExport *exp;
>      QCryptoTLSCreds *tlscreds;
>      char *tlsauthz;
>      QIOChannelSocket *sioc; /* The underlying data channel */
>      QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
>  
> -    Coroutine *recv_coroutine;
> +    Coroutine *recv_coroutine; /* protected by lock */
>  
>      CoMutex send_lock;
>      Coroutine *send_coroutine;
>  
> -    bool read_yielding;
> -    bool quiescing;
> +    bool read_yielding; /* protected by lock */
> +    bool quiescing; /* protected by lock */
>  
>      QTAILQ_ENTRY(NBDClient) next;
> -    int nb_requests;
> -    bool closing;
> +    int nb_requests; /* protected by lock */
> +    bool closing; /* protected by lock */
>  
>      uint32_t check_align; /* If non-zero, check for aligned client requests 
> */
>  
> @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t 
> size, Error **errp)
>  
>          len = qio_channel_readv(client->ioc, &iov, 1, errp);
>          if (len == QIO_CHANNEL_ERR_BLOCK) {
> -            client->read_yielding = true;
> +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> +                if (client->quiescing) {
> +                    return -EAGAIN;
> +                }

Why did you add another client->quiescing check here?

If it is to address a race, I think you only made the window a bit
smaller, but between releasing the lock and yielding the field could
still change, so drain needs to handle this case anyway.

> +                client->read_yielding = true;
> +            }
>              qio_channel_yield(client->ioc, G_IO_IN);
> -            client->read_yielding = false;
> -            if (client->quiescing) {
> -                return -EAGAIN;
> +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> +                client->read_yielding = false;
> +                if (client->quiescing) {
> +                    return -EAGAIN;
> +                }
>              }
>              continue;
>          } else if (len < 0) {
> @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client)
>              blk_exp_unref(&client->exp->common);
>          }
>          g_free(client->contexts.bitmaps);
> +        qemu_mutex_destroy(&client->lock);
>          g_free(client);
>      }
>  }
> @@ -1536,11 +1546,13 @@ static void client_close(NBDClient *client, bool 
> negotiated)
>  {
>      assert(qemu_in_main_thread());
>  
> -    if (client->closing) {
> -        return;
> -    }
> +    WITH_QEMU_LOCK_GUARD(&client->lock) {
> +        if (client->closing) {
> +            return;
> +        }
>  
> -    client->closing = true;
> +        client->closing = true;
> +    }
>  
>      /* Force requests to finish.  They will drop their own references,
>       * then we'll close the socket and free the NBDClient.
> @@ -1554,6 +1566,7 @@ static void client_close(NBDClient *client, bool 
> negotiated)
>      }
>  }
>  
> +/* Runs in export AioContext with client->lock held */
>  static NBDRequestData *nbd_request_get(NBDClient *client)
>  {
>      NBDRequestData *req;
> @@ -1566,6 +1579,7 @@ static NBDRequestData *nbd_request_get(NBDClient 
> *client)
>      return req;
>  }
>  
> +/* Runs in export AioContext with client->lock held */
>  static void nbd_request_put(NBDRequestData *req)
>  {
>      NBDClient *client = req->client;
> @@ -1589,14 +1603,18 @@ static void blk_aio_attached(AioContext *ctx, void 
> *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      trace_nbd_blk_aio_attached(exp->name, ctx);
>  
>      exp->common.ctx = ctx;
>  
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        assert(client->nb_requests == 0);
> -        assert(client->recv_coroutine == NULL);
> -        assert(client->send_coroutine == NULL);
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            assert(client->nb_requests == 0);
> +            assert(client->recv_coroutine == NULL);
> +            assert(client->send_coroutine == NULL);
> +        }
>      }
>  }
>  
> @@ -1604,6 +1622,8 @@ static void blk_aio_detach(void *opaque)
>  {
>      NBDExport *exp = opaque;
>  
> +    assert(qemu_in_main_thread());
> +
>      trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
>  
>      exp->common.ctx = NULL;
> @@ -1614,8 +1634,12 @@ static void nbd_drained_begin(void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        client->quiescing = true;
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            client->quiescing = true;
> +        }
>      }
>  }
>  
> @@ -1624,9 +1648,13 @@ static void nbd_drained_end(void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        client->quiescing = false;
> -        nbd_client_receive_next_request(client);
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            client->quiescing = false;
> +            nbd_client_receive_next_request(client);
> +        }
>      }
>  }
>  
> @@ -1635,17 +1663,21 @@ static bool nbd_drained_poll(void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        if (client->nb_requests != 0) {
> -            /*
> -             * If there's a coroutine waiting for a request on nbd_read_eof()
> -             * enter it here so we don't depend on the client to wake it up.
> -             */
> -            if (client->recv_coroutine != NULL && client->read_yielding) {
> -                qio_channel_wake_read(client->ioc);
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            if (client->nb_requests != 0) {
> +                /*
> +                 * If there's a coroutine waiting for a request on 
> nbd_read_eof()
> +                 * enter it here so we don't depend on the client to wake it 
> up.
> +                 */
> +                if (client->recv_coroutine != NULL && client->read_yielding) 
> {
> +                    qio_channel_wake_read(client->ioc);
> +                }

This is where the race from above becomes relevant.

Let's first look at calling qio_channel_wake_read() a tiny bit too
early: Without any locking in qio_channel_yield(), we could catch the
read coroutine between setting ioc->read_coroutine and before actually
yielding. In this case we would call aio_co_wake() on a coroutine that
is still running in a different thread. Since it's in a different
thread, we only schedule it instead entering it directly, and that just
works. The coroutine will immediately be reentered, which is exactly
what we want.

Even earlier calls of qio_channel_wake_read() (i.e. between setting
client->read_yielding and setting ioc->read_coroutine) don't actively
hurt, they just don't do anything if no read is in flight. (This is the
same case as if the nbd_trip() coroutine didn't even set
client->read_yielding yet, just that the check you added above can't
catch it.)

So if nbd_read_eof() didn't yield yet, we don't wake it here, but we
still return true, so the next drained_poll call will try again.

This is good in principle, but it depends on waking up the main thread
when we made progress. So we have to call aio_wait_kick() between
setting ioc->read_coroutine and yielding to make this work. What we
actually may get indirectly is an aio_notify() through setting FD
handlers if all implementations of qio_channel_set_aio_fd_handler()
actually do that. I suppose this could be enough?

Anyway, if my result after thinking really hard about this is "I can't
rule out that it's correct", maybe it would be better to just run this
code in the export AioContext instead so that we don't have to think
about all the subtleties and know that the nbd_co_trip() coroutine is at
a yield point?

> +
> +                return true;
>              }
> -
> -            return true;
>          }
>      }
>  
> @@ -1656,6 +1688,8 @@ static void nbd_eject_notifier(Notifier *n, void *data)
>  {
>      NBDExport *exp = container_of(n, NBDExport, eject_notifier);
>  
> +    assert(qemu_in_main_thread());
> +
>      blk_exp_request_shutdown(&exp->common);
>  }
>  
> @@ -2541,7 +2575,6 @@ static int coroutine_fn 
> nbd_co_receive_request(NBDRequestData *req,
>      int ret;
>  
>      g_assert(qemu_in_coroutine());
> -    assert(client->recv_coroutine == qemu_coroutine_self());
>      ret = nbd_receive_request(client, request, errp);
>      if (ret < 0) {
>          return ret;
> @@ -2950,7 +2983,11 @@ static coroutine_fn void nbd_trip(void *opaque)
>       */
>  
>      trace_nbd_trip();
> +
> +    qemu_mutex_lock(&client->lock);
> +
>      if (client->closing) {
> +        qemu_mutex_unlock(&client->lock);
>          aio_co_reschedule_self(qemu_get_aio_context());
>          nbd_client_put(client);
>          return;
> @@ -2961,15 +2998,24 @@ static coroutine_fn void nbd_trip(void *opaque)
>           * We're switching between AIO contexts. Don't attempt to receive a 
> new
>           * request and kick the main context which may be waiting for us.
>           */
> -        aio_co_reschedule_self(qemu_get_aio_context());
> -        nbd_client_put(client);
>          client->recv_coroutine = NULL;
> +        qemu_mutex_unlock(&client->lock);
>          aio_wait_kick();
> +
> +        aio_co_reschedule_self(qemu_get_aio_context());
> +        nbd_client_put(client);
>          return;
>      }
>  
>      req = nbd_request_get(client);
> -    ret = nbd_co_receive_request(req, &request, &local_err);
> +
> +    do {
> +        assert(client->recv_coroutine == qemu_coroutine_self());
> +        qemu_mutex_unlock(&client->lock);
> +        ret = nbd_co_receive_request(req, &request, &local_err);
> +        qemu_mutex_lock(&client->lock);
> +    } while (ret == -EAGAIN && !client->quiescing);

I think this deserves a comment to say that the loop is only about the
drain case without polling where drained_end has already happened before
we reach this point, so we may not terminate the coroutine any more
because nothing would restart it.

>      client->recv_coroutine = NULL;

As soon as we're past this, the nbd_client_receive_next_request() called
by drained_end will create a new coroutine, so we don't have to be
careful about the same case after this.

Kevin


Reply via email to