On Wed, Aug 23, 2023 at 07:45:04PM -0400, Stefan Hajnoczi wrote: > The ongoing QEMU multi-queue block layer effort makes it possible for multiple > threads to process I/O in parallel. The nbd block driver is not compatible > with > the multi-queue block layer yet because QIOChannel cannot be used easily from > coroutines running in multiple threads. This series changes the QIOChannel API > to make that possible. > > In the current API, calling qio_channel_attach_aio_context() sets the > AioContext where qio_channel_yield() installs an fd handler prior to yielding: > > qio_channel_attach_aio_context(ioc, my_ctx); > ... > qio_channel_yield(ioc); // my_ctx is used here > ... > qio_channel_detach_aio_context(ioc); > > This API design has limitations: reading and writing must be done in the same > AioContext and moving between AioContexts involves a cumbersome sequence of > API > calls that is not suitable for doing on a per-request basis. > > There is no fundamental reason why a QIOChannel needs to run within the > same AioContext every time qio_channel_yield() is called. QIOChannel > only uses the AioContext while inside qio_channel_yield(). The rest of > the time, QIOChannel is independent of any AioContext. > > In the new API, qio_channel_yield() queries the AioContext from the current > coroutine using qemu_coroutine_get_aio_context(). There is no need to > explicitly attach/detach AioContexts anymore and > qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are > gone. > One coroutine can read from the QIOChannel while another coroutine writes from > a different AioContext. > > This API change allows the nbd block driver to use QIOChannel from any thread. > It's important to keep in mind that the block driver already synchronizes > QIOChannel access and ensures that two coroutines never read simultaneously or > write simultaneously. > > This patch updates all users of qio_channel_attach_aio_context() to the > new API. Most conversions are simple, but vhost-user-server requires a > new qemu_coroutine_yield() call to quiesce the vu_client_trip() > coroutine when not attached to any AioContext. > > While the API is has become simpler, there is one wart: QIOChannel has a > special case for the iohandler AioContext (used for handlers that must not run > in nested event loops). I didn't find an elegant way preserve that behavior, > so > I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false) > for opting in to the new AioContext model. By default QIOChannel uses the > iohandler AioHandler. Code that formerly called > qio_channel_attach_aio_context() now calls > qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is > created.
I wonder if it is better to just pass the AioContext object into qio_channel_yield explicitly eg have qio_channel_yield(QIOChannel *ioc, AioContext *ctx, GIOCondition cond); With semantics that if 'ctx == NULL', then we assume the default global iohandler context, and for non-default context it must be non-NULL ? That could nicely de-couple the API from relying on global coroutine/thread state for querying an AioContext, which makes it easier to reason about IMHO. > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > --- > include/io/channel.h | 34 +++++++-- > include/qemu/vhost-user-server.h | 1 + > block/nbd.c | 11 +-- > io/channel-command.c | 13 +++- > io/channel-file.c | 18 ++++- > io/channel-null.c | 3 +- > io/channel-socket.c | 18 ++++- > io/channel-tls.c | 6 +- > io/channel.c | 120 ++++++++++++++++++++++--------- > migration/channel-block.c | 3 +- > nbd/client.c | 2 +- > nbd/server.c | 14 +--- > scsi/qemu-pr-helper.c | 4 +- > util/vhost-user-server.c | 27 +++++-- > 14 files changed, 191 insertions(+), 83 deletions(-) > > diff --git a/include/io/channel.h b/include/io/channel.h > index 229bf36910..dfbe6f2931 100644 > --- a/include/io/channel.h > +++ b/include/io/channel.h > @@ -81,9 +81,11 @@ struct QIOChannel { > Object parent; > unsigned int features; /* bitmask of QIOChannelFeatures */ > char *name; > - AioContext *ctx; > + AioContext *read_ctx; > Coroutine *read_coroutine; > + AioContext *write_ctx; > Coroutine *write_coroutine; > + bool follow_coroutine_ctx; > #ifdef _WIN32 > HANDLE event; /* For use with GSource on Win32 */ > #endif > @@ -140,8 +142,9 @@ struct QIOChannelClass { > int whence, > Error **errp); > void (*io_set_aio_fd_handler)(QIOChannel *ioc, > - AioContext *ctx, > + AioContext *read_ctx, > IOHandler *io_read, > + AioContext *write_ctx, > IOHandler *io_write, > void *opaque); > int (*io_flush)(QIOChannel *ioc, > @@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc, > bool enabled, > Error **errp); > > +/** > + * qio_channel_set_follow_coroutine_ctx: > + * @ioc: the channel object > + * @enabled: whether or not to follow the coroutine's AioContext > + * > + * If @enabled is true, calls to qio_channel_yield() use the current > + * coroutine's AioContext. Usually this is desirable. > + * > + * If @enabled is false, calls to qio_channel_yield() use the global > iohandler > + * AioContext. This is may be used by coroutines that run in the main loop > and > + * do not wish to respond to I/O during nested event loops. This is the > + * default for compatibility with code that is not aware of AioContexts. > + */ > +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled); > + > /** > * qio_channel_close: > * @ioc: the channel object > @@ -785,8 +803,9 @@ void qio_channel_wait(QIOChannel *ioc, > /** > * qio_channel_set_aio_fd_handler: > * @ioc: the channel object > - * @ctx: the AioContext to set the handlers on > + * @read_ctx: the AioContext to set the read handler on or NULL > * @io_read: the read handler > + * @write_ctx: the AioContext to set the write handler on or NULL > * @io_write: the write handler > * @opaque: the opaque value passed to the handler > * > @@ -794,10 +813,17 @@ void qio_channel_wait(QIOChannel *ioc, > * be used by channel implementations to forward the handlers > * to another channel (e.g. from #QIOChannelTLS to the > * underlying socket). > + * > + * When @read_ctx is NULL, don't touch the read handler. When @write_ctx is > + * NULL, don't touch the write handler. Note that setting the read handler > + * clears the write handler, and vice versa, if they share the same > AioContext. > + * Therefore the caller must pass both handlers together when sharing the > same > + * AioContext. > */ > void qio_channel_set_aio_fd_handler(QIOChannel *ioc, > - AioContext *ctx, > + AioContext *read_ctx, > IOHandler *io_read, > + AioContext *write_ctx, > IOHandler *io_write, > void *opaque); > Need to drop the qio_channel_attach_aio_context / qio_channel_detach_aio_context methods from the header too. > diff --git a/io/channel-command.c b/io/channel-command.c > index 7ed726c802..1f61026222 100644 > --- a/io/channel-command.c > +++ b/io/channel-command.c > @@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc, > > > static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc, > - AioContext *ctx, > + AioContext *read_ctx, > IOHandler *io_read, > + AioContext *write_ctx, > IOHandler *io_write, > void *opaque) > { > QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc); > - aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque); > - aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, > opaque); > + if (read_ctx) { > + aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL, > + NULL, NULL, opaque); > + } > + if (write_ctx) { > + aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write, > + NULL, NULL, opaque); > + } > } > > > diff --git a/io/channel-file.c b/io/channel-file.c > index 8b5821f452..e6c6329dbb 100644 > --- a/io/channel-file.c > +++ b/io/channel-file.c > @@ -192,13 +192,27 @@ static int qio_channel_file_close(QIOChannel *ioc, > > > static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc, > - AioContext *ctx, > + AioContext *read_ctx, > IOHandler *io_read, > + AioContext *write_ctx, > IOHandler *io_write, > void *opaque) > { > QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc); > - aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque); > + > + if (read_ctx == write_ctx) { > + aio_set_fd_handler(read_ctx, fioc->fd, io_read, io_write, > + NULL, NULL, opaque); > + } else { > + if (read_ctx) { > + aio_set_fd_handler(read_ctx, fioc->fd, io_read, NULL, > + NULL, NULL, opaque); > + } > + if (write_ctx) { > + aio_set_fd_handler(write_ctx, fioc->fd, NULL, io_write, > + NULL, NULL, opaque); > + } > + } > } > > static GSource *qio_channel_file_create_watch(QIOChannel *ioc, > diff --git a/io/channel-null.c b/io/channel-null.c > index 4fafdb770d..ef99586348 100644 > --- a/io/channel-null.c > +++ b/io/channel-null.c > @@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc, > > static void > qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED, > - AioContext *ctx G_GNUC_UNUSED, > + AioContext *read_ctx G_GNUC_UNUSED, > IOHandler *io_read G_GNUC_UNUSED, > + AioContext *write_ctx G_GNUC_UNUSED, > IOHandler *io_write G_GNUC_UNUSED, > void *opaque G_GNUC_UNUSED) > { > diff --git a/io/channel-socket.c b/io/channel-socket.c > index d99945ebec..daeb92bbe0 100644 > --- a/io/channel-socket.c > +++ b/io/channel-socket.c > @@ -893,13 +893,27 @@ qio_channel_socket_shutdown(QIOChannel *ioc, > } > > static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc, > - AioContext *ctx, > + AioContext *read_ctx, > IOHandler *io_read, > + AioContext *write_ctx, > IOHandler *io_write, > void *opaque) > { > QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); > - aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque); > + > + if (read_ctx == write_ctx) { > + aio_set_fd_handler(read_ctx, sioc->fd, io_read, io_write, > + NULL, NULL, opaque); > + } else { > + if (read_ctx) { > + aio_set_fd_handler(read_ctx, sioc->fd, io_read, NULL, > + NULL, NULL, opaque); > + } > + if (write_ctx) { > + aio_set_fd_handler(write_ctx, sioc->fd, NULL, io_write, > + NULL, NULL, opaque); > + } > + } > } > > static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, The file, command and socket impls all have fairly similar logic, and could be handled by calling out to a common helper in channel-util.c along the lines of this: void qio_channel_util_set_aio_fd_handler(int read_fd, AioContext *read_ctx, IOHandler *io_read, int write_fd, AioContext *write_ctx, IOHandler *io_write, void *opaque) { if (read_fd == write_fd && read_ctx == write_ctx) { aio_set_fd_handler(read_ctx, read_fd, io_read, io_write, NULL, NULL, opaque); } else { if (read_ctx) { aio_set_fd_handler(read_ctx, read_fd, io_read, NULL, NULL, NULL, opaque); } if (write_ctx) { aio_set_fd_handler(write_ctx, write_fd, NULL, io_write, NULL, NULL, opaque); } } } > diff --git a/io/channel.c b/io/channel.c > index c415f3fc88..b190d593d3 100644 > --- a/io/channel.c > +++ b/io/channel.c > @@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc, > } > > > +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled) > +{ > + ioc->follow_coroutine_ctx = enabled; > +} > + > + > int qio_channel_close(QIOChannel *ioc, > Error **errp) > { > @@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque) > aio_co_wake(co); > } > > -static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) > +static void coroutine_fn > +qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition) > { > - IOHandler *rd_handler = NULL, *wr_handler = NULL; > + AioContext *ctx = ioc->follow_coroutine_ctx ? > + qemu_coroutine_get_aio_context(qemu_coroutine_self()) : > + iohandler_get_aio_context(); This is conditionally calling qemu_coroutine_get_aio_context or iohandler_get_aio_context, but in qio_channel_yield, we don't look at 'follow_coroutine_ctx' and unconditionally use qemu_coroutine_get_aio_context. Is that correct ? Should we not just pass in the AioContext directly from qio_channel_yield to ensure consistency ? > + AioContext *read_ctx = NULL; > + IOHandler *io_read = NULL; > + AioContext *write_ctx = NULL; > + IOHandler *io_write = NULL; > + > + if (condition == G_IO_IN) { > + ioc->read_coroutine = qemu_coroutine_self(); > + ioc->read_ctx = ctx; > + read_ctx = ctx; > + io_read = qio_channel_restart_read; > + > + /* > + * Thread safety: if the other coroutine is set and its AioContext > + * match ours, then there is mutual exclusion between read and write > + * because they share a single thread and it's safe to set both read > + * and write fd handlers here. If the AioContext does not match ours, > + * then both threads may run in parallel but there is no shared state > + * to worry about. > + */ > + if (ioc->write_coroutine && ioc->write_ctx == ctx) { > + write_ctx = ctx; > + io_write = qio_channel_restart_write; > + } > + } else if (condition == G_IO_OUT) { > + ioc->write_coroutine = qemu_coroutine_self(); > + ioc->write_ctx = ctx; > + write_ctx = ctx; > + io_write = qio_channel_restart_write; > + if (ioc->read_coroutine && ioc->read_ctx == ctx) { > + read_ctx = ctx; > + io_read = qio_channel_restart_read; > + } > + } else { > + abort(); > + } > + > + qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read, > + write_ctx, io_write, ioc); > +} snip > void coroutine_fn qio_channel_yield(QIOChannel *ioc, > GIOCondition condition) > { > - AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context(); > + AioContext *ioc_ctx; > > assert(qemu_in_coroutine()); > - assert(in_aio_context_home_thread(ioc_ctx)); > + ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self()); > > if (condition == G_IO_IN) { > assert(!ioc->read_coroutine); > - ioc->read_coroutine = qemu_coroutine_self(); > } else if (condition == G_IO_OUT) { > assert(!ioc->write_coroutine); > - ioc->write_coroutine = qemu_coroutine_self(); > } else { > abort(); > } > - qio_channel_set_aio_fd_handlers(ioc); > + qio_channel_set_fd_handlers(ioc, condition); > qemu_coroutine_yield(); > assert(in_aio_context_home_thread(ioc_ctx)); > > @@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc, > * through the aio_fd_handlers. */ > if (condition == G_IO_IN) { > assert(ioc->read_coroutine == NULL); > - qio_channel_set_aio_fd_handlers(ioc); > } else if (condition == G_IO_OUT) { > assert(ioc->write_coroutine == NULL); > - qio_channel_set_aio_fd_handlers(ioc); > } > + qio_channel_clear_fd_handlers(ioc, condition); > } > > void qio_channel_wake_read(QIOChannel *ioc) With regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|