On Wed, Aug 23, 2023 at 07:45:04PM -0400, Stefan Hajnoczi wrote:
> The ongoing QEMU multi-queue block layer effort makes it possible for multiple
> threads to process I/O in parallel. The nbd block driver is not compatible 
> with
> the multi-queue block layer yet because QIOChannel cannot be used easily from
> coroutines running in multiple threads. This series changes the QIOChannel API
> to make that possible.
> 
> In the current API, calling qio_channel_attach_aio_context() sets the
> AioContext where qio_channel_yield() installs an fd handler prior to yielding:
> 
>   qio_channel_attach_aio_context(ioc, my_ctx);
>   ...
>   qio_channel_yield(ioc); // my_ctx is used here
>   ...
>   qio_channel_detach_aio_context(ioc);
> 
> This API design has limitations: reading and writing must be done in the same
> AioContext and moving between AioContexts involves a cumbersome sequence of 
> API
> calls that is not suitable for doing on a per-request basis.
> 
> There is no fundamental reason why a QIOChannel needs to run within the
> same AioContext every time qio_channel_yield() is called. QIOChannel
> only uses the AioContext while inside qio_channel_yield(). The rest of
> the time, QIOChannel is independent of any AioContext.
> 
> In the new API, qio_channel_yield() queries the AioContext from the current
> coroutine using qemu_coroutine_get_aio_context(). There is no need to
> explicitly attach/detach AioContexts anymore and
> qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are 
> gone.
> One coroutine can read from the QIOChannel while another coroutine writes from
> a different AioContext.
> 
> This API change allows the nbd block driver to use QIOChannel from any thread.
> It's important to keep in mind that the block driver already synchronizes
> QIOChannel access and ensures that two coroutines never read simultaneously or
> write simultaneously.
> 
> This patch updates all users of qio_channel_attach_aio_context() to the
> new API. Most conversions are simple, but vhost-user-server requires a
> new qemu_coroutine_yield() call to quiesce the vu_client_trip()
> coroutine when not attached to any AioContext.
> 
> While the API is has become simpler, there is one wart: QIOChannel has a
> special case for the iohandler AioContext (used for handlers that must not run
> in nested event loops). I didn't find an elegant way preserve that behavior, 
> so
> I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false)
> for opting in to the new AioContext model. By default QIOChannel uses the
> iohandler AioHandler. Code that formerly called
> qio_channel_attach_aio_context() now calls
> qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is
> created.

I wonder if it is better to just pass the AioContext object into
qio_channel_yield explicitly eg have

  qio_channel_yield(QIOChannel *ioc,
                    AioContext *ctx,
                    GIOCondition cond);

With semantics that if 'ctx == NULL', then we assume the default
global iohandler context, and for non-default context it must
be non-NULL ?

That could nicely de-couple the API  from relying on global
coroutine/thread state for querying an AioContext, which
makes it easier to reason about IMHO.

> 
> Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
> ---
>  include/io/channel.h             |  34 +++++++--
>  include/qemu/vhost-user-server.h |   1 +
>  block/nbd.c                      |  11 +--
>  io/channel-command.c             |  13 +++-
>  io/channel-file.c                |  18 ++++-
>  io/channel-null.c                |   3 +-
>  io/channel-socket.c              |  18 ++++-
>  io/channel-tls.c                 |   6 +-
>  io/channel.c                     | 120 ++++++++++++++++++++++---------
>  migration/channel-block.c        |   3 +-
>  nbd/client.c                     |   2 +-
>  nbd/server.c                     |  14 +---
>  scsi/qemu-pr-helper.c            |   4 +-
>  util/vhost-user-server.c         |  27 +++++--
>  14 files changed, 191 insertions(+), 83 deletions(-)
> 
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 229bf36910..dfbe6f2931 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -81,9 +81,11 @@ struct QIOChannel {
>      Object parent;
>      unsigned int features; /* bitmask of QIOChannelFeatures */
>      char *name;
> -    AioContext *ctx;
> +    AioContext *read_ctx;
>      Coroutine *read_coroutine;
> +    AioContext *write_ctx;
>      Coroutine *write_coroutine;
> +    bool follow_coroutine_ctx;
>  #ifdef _WIN32
>      HANDLE event; /* For use with GSource on Win32 */
>  #endif
> @@ -140,8 +142,9 @@ struct QIOChannelClass {
>                       int whence,
>                       Error **errp);
>      void (*io_set_aio_fd_handler)(QIOChannel *ioc,
> -                                  AioContext *ctx,
> +                                  AioContext *read_ctx,
>                                    IOHandler *io_read,
> +                                  AioContext *write_ctx,
>                                    IOHandler *io_write,
>                                    void *opaque);
>      int (*io_flush)(QIOChannel *ioc,
> @@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
>                               bool enabled,
>                               Error **errp);
>  
> +/**
> + * qio_channel_set_follow_coroutine_ctx:
> + * @ioc: the channel object
> + * @enabled: whether or not to follow the coroutine's AioContext
> + *
> + * If @enabled is true, calls to qio_channel_yield() use the current
> + * coroutine's AioContext. Usually this is desirable.
> + *
> + * If @enabled is false, calls to qio_channel_yield() use the global 
> iohandler
> + * AioContext. This is may be used by coroutines that run in the main loop 
> and
> + * do not wish to respond to I/O during nested event loops. This is the
> + * default for compatibility with code that is not aware of AioContexts.
> + */
> +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);
> +
>  /**
>   * qio_channel_close:
>   * @ioc: the channel object
> @@ -785,8 +803,9 @@ void qio_channel_wait(QIOChannel *ioc,
>  /**
>   * qio_channel_set_aio_fd_handler:
>   * @ioc: the channel object
> - * @ctx: the AioContext to set the handlers on
> + * @read_ctx: the AioContext to set the read handler on or NULL
>   * @io_read: the read handler
> + * @write_ctx: the AioContext to set the write handler on or NULL
>   * @io_write: the write handler
>   * @opaque: the opaque value passed to the handler
>   *
> @@ -794,10 +813,17 @@ void qio_channel_wait(QIOChannel *ioc,
>   * be used by channel implementations to forward the handlers
>   * to another channel (e.g. from #QIOChannelTLS to the
>   * underlying socket).
> + *
> + * When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
> + * NULL, don't touch the write handler. Note that setting the read handler
> + * clears the write handler, and vice versa, if they share the same 
> AioContext.
> + * Therefore the caller must pass both handlers together when sharing the 
> same
> + * AioContext.
>   */
>  void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
> -                                    AioContext *ctx,
> +                                    AioContext *read_ctx,
>                                      IOHandler *io_read,
> +                                    AioContext *write_ctx,
>                                      IOHandler *io_write,
>                                      void *opaque);
>

Need to drop the qio_channel_attach_aio_context / qio_channel_detach_aio_context
methods from the header too.


> diff --git a/io/channel-command.c b/io/channel-command.c
> index 7ed726c802..1f61026222 100644
> --- a/io/channel-command.c
> +++ b/io/channel-command.c
> @@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc,
>  
>  
>  static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
> -                                                   AioContext *ctx,
> +                                                   AioContext *read_ctx,
>                                                     IOHandler *io_read,
> +                                                   AioContext *write_ctx,
>                                                     IOHandler *io_write,
>                                                     void *opaque)
>  {
>      QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
> -    aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
> -    aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, 
> opaque);
> +    if (read_ctx) {
> +        aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL,
> +                NULL, NULL, opaque);
> +    }
> +    if (write_ctx) {
> +        aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write,
> +                NULL, NULL, opaque);
> +    }
>  }
>  
>  
> diff --git a/io/channel-file.c b/io/channel-file.c
> index 8b5821f452..e6c6329dbb 100644
> --- a/io/channel-file.c
> +++ b/io/channel-file.c
> @@ -192,13 +192,27 @@ static int qio_channel_file_close(QIOChannel *ioc,
>  
>  
>  static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
> -                                                AioContext *ctx,
> +                                                AioContext *read_ctx,
>                                                  IOHandler *io_read,
> +                                                AioContext *write_ctx,
>                                                  IOHandler *io_write,
>                                                  void *opaque)
>  {
>      QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
> -    aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
> +
> +    if (read_ctx == write_ctx) {
> +        aio_set_fd_handler(read_ctx, fioc->fd, io_read, io_write,
> +                NULL, NULL, opaque);
> +    } else {
> +        if (read_ctx) {
> +            aio_set_fd_handler(read_ctx, fioc->fd, io_read, NULL,
> +                    NULL, NULL, opaque);
> +        }
> +        if (write_ctx) {
> +            aio_set_fd_handler(write_ctx, fioc->fd, NULL, io_write,
> +                    NULL, NULL, opaque);
> +        }
> +    }
>  }
>  
>  static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
> diff --git a/io/channel-null.c b/io/channel-null.c
> index 4fafdb770d..ef99586348 100644
> --- a/io/channel-null.c
> +++ b/io/channel-null.c
> @@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
>  
>  static void
>  qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
> -                                    AioContext *ctx G_GNUC_UNUSED,
> +                                    AioContext *read_ctx G_GNUC_UNUSED,
>                                      IOHandler *io_read G_GNUC_UNUSED,
> +                                    AioContext *write_ctx G_GNUC_UNUSED,
>                                      IOHandler *io_write G_GNUC_UNUSED,
>                                      void *opaque G_GNUC_UNUSED)
>  {
> diff --git a/io/channel-socket.c b/io/channel-socket.c
> index d99945ebec..daeb92bbe0 100644
> --- a/io/channel-socket.c
> +++ b/io/channel-socket.c
> @@ -893,13 +893,27 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
>  }
>  
>  static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
> -                                                  AioContext *ctx,
> +                                                  AioContext *read_ctx,
>                                                    IOHandler *io_read,
> +                                                  AioContext *write_ctx,
>                                                    IOHandler *io_write,
>                                                    void *opaque)
>  {
>      QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> -    aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
> +
> +    if (read_ctx == write_ctx) {
> +        aio_set_fd_handler(read_ctx, sioc->fd, io_read, io_write,
> +                           NULL, NULL, opaque);
> +    } else {
> +        if (read_ctx) {
> +            aio_set_fd_handler(read_ctx, sioc->fd, io_read, NULL,
> +                               NULL, NULL, opaque);
> +        }
> +        if (write_ctx) {
> +            aio_set_fd_handler(write_ctx, sioc->fd, NULL, io_write,
> +                               NULL, NULL, opaque);
> +        }
> +    }
>  }
>  
>  static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,

The file, command and socket impls all have fairly similar logic, and
could be handled by calling out to a common helper in channel-util.c
along the lines of this:

  void qio_channel_util_set_aio_fd_handler(int read_fd,
                                           AioContext *read_ctx,
                                           IOHandler *io_read,
                                           int write_fd,
                                           AioContext *write_ctx,
                                           IOHandler *io_write,
                                           void *opaque)
{
     if (read_fd == write_fd && read_ctx == write_ctx) {
         aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
                            NULL, NULL, opaque);
     } else {
         if (read_ctx) {
             aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
                                NULL, NULL, opaque);
         }
         if (write_ctx) {
             aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
                                NULL, NULL, opaque);
         }
     }
}


> diff --git a/io/channel.c b/io/channel.c
> index c415f3fc88..b190d593d3 100644
> --- a/io/channel.c
> +++ b/io/channel.c
> @@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
>  }
>  
>  
> +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
> +{
> +    ioc->follow_coroutine_ctx = enabled;
> +}
> +
> +
>  int qio_channel_close(QIOChannel *ioc,
>                        Error **errp)
>  {
> @@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
>      aio_co_wake(co);
>  }
>  
> -static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
> +static void coroutine_fn
> +qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
>  {
> -    IOHandler *rd_handler = NULL, *wr_handler = NULL;
> +    AioContext *ctx = ioc->follow_coroutine_ctx ?
> +        qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
> +        iohandler_get_aio_context();

This is conditionally calling qemu_coroutine_get_aio_context
or iohandler_get_aio_context, but in qio_channel_yield, we
don't look at 'follow_coroutine_ctx' and unconditionally use
qemu_coroutine_get_aio_context.  Is that correct ?

Should we not just pass in the AioContext directly from
qio_channel_yield to ensure consistency ?

> +    AioContext *read_ctx = NULL;
> +    IOHandler *io_read = NULL;
> +    AioContext *write_ctx = NULL;
> +    IOHandler *io_write = NULL;
> +
> +    if (condition == G_IO_IN) {
> +        ioc->read_coroutine = qemu_coroutine_self();
> +        ioc->read_ctx = ctx;
> +        read_ctx = ctx;
> +        io_read = qio_channel_restart_read;
> +
> +        /*
> +         * Thread safety: if the other coroutine is set and its AioContext
> +         * match ours, then there is mutual exclusion between read and write
> +         * because they share a single thread and it's safe to set both read
> +         * and write fd handlers here. If the AioContext does not match ours,
> +         * then both threads may run in parallel but there is no shared state
> +         * to worry about.
> +         */
> +        if (ioc->write_coroutine && ioc->write_ctx == ctx) {
> +            write_ctx = ctx;
> +            io_write = qio_channel_restart_write;
> +        }
> +    } else if (condition == G_IO_OUT) {
> +        ioc->write_coroutine = qemu_coroutine_self();
> +        ioc->write_ctx = ctx;
> +        write_ctx = ctx;
> +        io_write = qio_channel_restart_write;
> +        if (ioc->read_coroutine && ioc->read_ctx == ctx) {
> +            read_ctx = ctx;
> +            io_read = qio_channel_restart_read;
> +        }
> +    } else {
> +        abort();
> +    }
> +
> +    qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
> +            write_ctx, io_write, ioc);
> +}

snip

>  void coroutine_fn qio_channel_yield(QIOChannel *ioc,
>                                      GIOCondition condition)
>  {
> -    AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
> +    AioContext *ioc_ctx;
>  
>      assert(qemu_in_coroutine());
> -    assert(in_aio_context_home_thread(ioc_ctx));
> +    ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
>  
>      if (condition == G_IO_IN) {
>          assert(!ioc->read_coroutine);
> -        ioc->read_coroutine = qemu_coroutine_self();
>      } else if (condition == G_IO_OUT) {
>          assert(!ioc->write_coroutine);
> -        ioc->write_coroutine = qemu_coroutine_self();
>      } else {
>          abort();
>      }
> -    qio_channel_set_aio_fd_handlers(ioc);
> +    qio_channel_set_fd_handlers(ioc, condition);
>      qemu_coroutine_yield();
>      assert(in_aio_context_home_thread(ioc_ctx));
>  
> @@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
>       * through the aio_fd_handlers. */
>      if (condition == G_IO_IN) {
>          assert(ioc->read_coroutine == NULL);
> -        qio_channel_set_aio_fd_handlers(ioc);
>      } else if (condition == G_IO_OUT) {
>          assert(ioc->write_coroutine == NULL);
> -        qio_channel_set_aio_fd_handlers(ioc);
>      }
> +    qio_channel_clear_fd_handlers(ioc, condition);
>  }
>  
>  void qio_channel_wake_read(QIOChannel *ioc)



With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|


Reply via email to