On Thu, Dec 21, 2023 at 11:45:36AM +0100, Kevin Wolf wrote:
> Am 21.12.2023 um 02:49 hat Stefan Hajnoczi geschrieben:
> > NBDClient has a number of fields that are accessed by both the export
> > AioContext and the main loop thread. When the AioContext lock is removed
> > these fields will need another form of protection.
> > 
> > Add NBDClient->lock and protect fields that are accessed by both
> > threads. Also add assertions where possible and otherwise add doc
> > comments stating assumptions about which thread and lock holding.
> > 
> > Note this patch moves the client->recv_coroutine assertion from
> > nbd_co_receive_request() to nbd_trip() where client->lock is held.
> > 
> > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
> > ---
> >  nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++--------------
> >  1 file changed, 94 insertions(+), 34 deletions(-)
> > 
> > diff --git a/nbd/server.c b/nbd/server.c
> > index 527fbdab4a..4008ec7df9 100644
> > --- a/nbd/server.c
> > +++ b/nbd/server.c
> > @@ -125,23 +125,25 @@ struct NBDClient {
> >      int refcount; /* atomic */
> >      void (*close_fn)(NBDClient *client, bool negotiated);
> >  
> > +    QemuMutex lock;
> > +
> >      NBDExport *exp;
> >      QCryptoTLSCreds *tlscreds;
> >      char *tlsauthz;
> >      QIOChannelSocket *sioc; /* The underlying data channel */
> >      QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) 
> > */
> >  
> > -    Coroutine *recv_coroutine;
> > +    Coroutine *recv_coroutine; /* protected by lock */
> >  
> >      CoMutex send_lock;
> >      Coroutine *send_coroutine;
> >  
> > -    bool read_yielding;
> > -    bool quiescing;
> > +    bool read_yielding; /* protected by lock */
> > +    bool quiescing; /* protected by lock */
> >  
> >      QTAILQ_ENTRY(NBDClient) next;
> > -    int nb_requests;
> > -    bool closing;
> > +    int nb_requests; /* protected by lock */
> > +    bool closing; /* protected by lock */
> >  
> >      uint32_t check_align; /* If non-zero, check for aligned client 
> > requests */
> >  
> > @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, 
> > size_t size, Error **errp)
> >  
> >          len = qio_channel_readv(client->ioc, &iov, 1, errp);
> >          if (len == QIO_CHANNEL_ERR_BLOCK) {
> > -            client->read_yielding = true;
> > +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +                if (client->quiescing) {
> > +                    return -EAGAIN;
> > +                }
> 
> Why did you add another client->quiescing check here?
> 
> If it is to address a race, I think you only made the window a bit
> smaller, but between releasing the lock and yielding the field could
> still change, so drain needs to handle this case anyway.

I added it for consistency/symmetry where nbd_trip() checks
client->quiescing after acquiring client->lock but didn't have any
specific scenario in mind. I'll drop this.

I agree that it does not prevent races. .drained_begin() +
.drained_poll() can run after client->lock is released and before
qio_channel_yield() takes effect. In that case we miss client->quiescing
and still have the race where no wake occurs because
qio_channel_wake_read() sees ioc->read_coroutine == NULL.

> > +                client->read_yielding = true;
> > +            }
> >              qio_channel_yield(client->ioc, G_IO_IN);
> > -            client->read_yielding = false;
> > -            if (client->quiescing) {
> > -                return -EAGAIN;
> > +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +                client->read_yielding = false;
> > +                if (client->quiescing) {
> > +                    return -EAGAIN;
> > +                }
> >              }
> >              continue;
> >          } else if (len < 0) {
> > @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client)
> >              blk_exp_unref(&client->exp->common);
> >          }
> >          g_free(client->contexts.bitmaps);
> > +        qemu_mutex_destroy(&client->lock);
> >          g_free(client);
> >      }
> >  }
> > @@ -1536,11 +1546,13 @@ static void client_close(NBDClient *client, bool 
> > negotiated)
> >  {
> >      assert(qemu_in_main_thread());
> >  
> > -    if (client->closing) {
> > -        return;
> > -    }
> > +    WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +        if (client->closing) {
> > +            return;
> > +        }
> >  
> > -    client->closing = true;
> > +        client->closing = true;
> > +    }
> >  
> >      /* Force requests to finish.  They will drop their own references,
> >       * then we'll close the socket and free the NBDClient.
> > @@ -1554,6 +1566,7 @@ static void client_close(NBDClient *client, bool 
> > negotiated)
> >      }
> >  }
> >  
> > +/* Runs in export AioContext with client->lock held */
> >  static NBDRequestData *nbd_request_get(NBDClient *client)
> >  {
> >      NBDRequestData *req;
> > @@ -1566,6 +1579,7 @@ static NBDRequestData *nbd_request_get(NBDClient 
> > *client)
> >      return req;
> >  }
> >  
> > +/* Runs in export AioContext with client->lock held */
> >  static void nbd_request_put(NBDRequestData *req)
> >  {
> >      NBDClient *client = req->client;
> > @@ -1589,14 +1603,18 @@ static void blk_aio_attached(AioContext *ctx, void 
> > *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      trace_nbd_blk_aio_attached(exp->name, ctx);
> >  
> >      exp->common.ctx = ctx;
> >  
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        assert(client->nb_requests == 0);
> > -        assert(client->recv_coroutine == NULL);
> > -        assert(client->send_coroutine == NULL);
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            assert(client->nb_requests == 0);
> > +            assert(client->recv_coroutine == NULL);
> > +            assert(client->send_coroutine == NULL);
> > +        }
> >      }
> >  }
> >  
> > @@ -1604,6 +1622,8 @@ static void blk_aio_detach(void *opaque)
> >  {
> >      NBDExport *exp = opaque;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> >  
> >      exp->common.ctx = NULL;
> > @@ -1614,8 +1634,12 @@ static void nbd_drained_begin(void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        client->quiescing = true;
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            client->quiescing = true;
> > +        }
> >      }
> >  }
> >  
> > @@ -1624,9 +1648,13 @@ static void nbd_drained_end(void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        client->quiescing = false;
> > -        nbd_client_receive_next_request(client);
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            client->quiescing = false;
> > +            nbd_client_receive_next_request(client);
> > +        }
> >      }
> >  }
> >  
> > @@ -1635,17 +1663,21 @@ static bool nbd_drained_poll(void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        if (client->nb_requests != 0) {
> > -            /*
> > -             * If there's a coroutine waiting for a request on 
> > nbd_read_eof()
> > -             * enter it here so we don't depend on the client to wake it 
> > up.
> > -             */
> > -            if (client->recv_coroutine != NULL && client->read_yielding) {
> > -                qio_channel_wake_read(client->ioc);
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            if (client->nb_requests != 0) {
> > +                /*
> > +                 * If there's a coroutine waiting for a request on 
> > nbd_read_eof()
> > +                 * enter it here so we don't depend on the client to wake 
> > it up.
> > +                 */
> > +                if (client->recv_coroutine != NULL && 
> > client->read_yielding) {
> > +                    qio_channel_wake_read(client->ioc);
> > +                }
> 
> This is where the race from above becomes relevant.
> 
> Let's first look at calling qio_channel_wake_read() a tiny bit too
> early: Without any locking in qio_channel_yield(), we could catch the
> read coroutine between setting ioc->read_coroutine and before actually
> yielding. In this case we would call aio_co_wake() on a coroutine that
> is still running in a different thread. Since it's in a different
> thread, we only schedule it instead entering it directly, and that just
> works. The coroutine will immediately be reentered, which is exactly
> what we want.
> 
> Even earlier calls of qio_channel_wake_read() (i.e. between setting
> client->read_yielding and setting ioc->read_coroutine) don't actively
> hurt, they just don't do anything if no read is in flight. (This is the
> same case as if the nbd_trip() coroutine didn't even set
> client->read_yielding yet, just that the check you added above can't
> catch it.)
> 
> So if nbd_read_eof() didn't yield yet, we don't wake it here, but we
> still return true, so the next drained_poll call will try again.
> 
> This is good in principle, but it depends on waking up the main thread
> when we made progress. So we have to call aio_wait_kick() between
> setting ioc->read_coroutine and yielding to make this work. What we
> actually may get indirectly is an aio_notify() through setting FD
> handlers if all implementations of qio_channel_set_aio_fd_handler()
> actually do that. I suppose this could be enough?

qio_channel_set_aio_fd_handler() calls aio_notify() on the export's
AioContext. It does not wake the main loop AioContext when an IOThread
is being used so I don't think it helps here.

The state where qio_channel_wake_read() misses that nbd_trip() is
yielding looks like this:

client->nb_requests > 0
client->recv_coroutine = nbd_trip() coroutine
client->quiescing = true
client->read_yielding = true
ioc->read_coroutine = NULL

The main loop thread is waiting for activity and nbd_trip() enters
qemu_coroutine_yield(). There is no progress until the main loop thread
resumes (which can be triggered by the export AioContext completing NBD
I/O too).

I guess the race isn't immediately apparent because there is usually
some event loop activity that hides the problem.

> Anyway, if my result after thinking really hard about this is "I can't
> rule out that it's correct", maybe it would be better to just run this
> code in the export AioContext instead so that we don't have to think
> about all the subtleties and know that the nbd_co_trip() coroutine is at
> a yield point?

Agreed.

> > +
> > +                return true;
> >              }
> > -
> > -            return true;
> >          }
> >      }
> >  
> > @@ -1656,6 +1688,8 @@ static void nbd_eject_notifier(Notifier *n, void 
> > *data)
> >  {
> >      NBDExport *exp = container_of(n, NBDExport, eject_notifier);
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      blk_exp_request_shutdown(&exp->common);
> >  }
> >  
> > @@ -2541,7 +2575,6 @@ static int coroutine_fn 
> > nbd_co_receive_request(NBDRequestData *req,
> >      int ret;
> >  
> >      g_assert(qemu_in_coroutine());
> > -    assert(client->recv_coroutine == qemu_coroutine_self());
> >      ret = nbd_receive_request(client, request, errp);
> >      if (ret < 0) {
> >          return ret;
> > @@ -2950,7 +2983,11 @@ static coroutine_fn void nbd_trip(void *opaque)
> >       */
> >  
> >      trace_nbd_trip();
> > +
> > +    qemu_mutex_lock(&client->lock);
> > +
> >      if (client->closing) {
> > +        qemu_mutex_unlock(&client->lock);
> >          aio_co_reschedule_self(qemu_get_aio_context());
> >          nbd_client_put(client);
> >          return;
> > @@ -2961,15 +2998,24 @@ static coroutine_fn void nbd_trip(void *opaque)
> >           * We're switching between AIO contexts. Don't attempt to receive 
> > a new
> >           * request and kick the main context which may be waiting for us.
> >           */
> > -        aio_co_reschedule_self(qemu_get_aio_context());
> > -        nbd_client_put(client);
> >          client->recv_coroutine = NULL;
> > +        qemu_mutex_unlock(&client->lock);
> >          aio_wait_kick();
> > +
> > +        aio_co_reschedule_self(qemu_get_aio_context());
> > +        nbd_client_put(client);
> >          return;
> >      }
> >  
> >      req = nbd_request_get(client);
> > -    ret = nbd_co_receive_request(req, &request, &local_err);
> > +
> > +    do {
> > +        assert(client->recv_coroutine == qemu_coroutine_self());
> > +        qemu_mutex_unlock(&client->lock);
> > +        ret = nbd_co_receive_request(req, &request, &local_err);
> > +        qemu_mutex_lock(&client->lock);
> > +    } while (ret == -EAGAIN && !client->quiescing);
> 
> I think this deserves a comment to say that the loop is only about the
> drain case without polling where drained_end has already happened before
> we reach this point, so we may not terminate the coroutine any more
> because nothing would restart it.

Sounds good, I'll add a comment in the next revision.

> >      client->recv_coroutine = NULL;
> 
> As soon as we're past this, the nbd_client_receive_next_request() called
> by drained_end will create a new coroutine, so we don't have to be
> careful about the same case after this.
> 
> Kevin
> 

Attachment: signature.asc
Description: PGP signature

Reply via email to