On Mon, Mar 25, 2024 at 05:18:50PM +0800, zhuyangyang wrote: > If g_main_loop_run()/aio_poll() is called in the coroutine context, > the pending coroutine may be woken up repeatedly, and the co_queue_wakeup > may be disordered.
aio_poll() must not be called from coroutine context: bool no_coroutine_fn aio_poll(AioContext *ctx, bool blocking); ^^^^^^^^^^^^^^^ Coroutines are not supposed to block. Instead, they should yield. > When the poll() syscall exited in g_main_loop_run()/aio_poll(), it means > some listened events is completed. Therefore, the completion callback > function is dispatched. > > If this callback function needs to invoke aio_co_enter(), it will only > wake up the coroutine (because we are already in coroutine context), > which may cause that the data on this listening event_fd/socket_fd > is not read/cleared. When the next poll () exits, it will be woken up again > and inserted into the wakeup queue again. > > For example, if TLS is enabled in NBD, the server will call g_main_loop_run() > in the coroutine, and repeatedly wake up the io_read event on a socket. > The call stack is as follows: > > aio_co_enter() > aio_co_wake() > qio_channel_restart_read() > aio_dispatch_handler() > aio_dispatch_handlers() > aio_dispatch() > aio_ctx_dispatch() > g_main_context_dispatch() > g_main_loop_run() > nbd_negotiate_handle_starttls() This code does not look like it was designed to run in coroutine context. Two options: 1. Don't run it in coroutine context (e.g. use a BH instead). This avoids blocking the coroutine but calling g_main_loop_run() is still ugly, in my opinion. 2. Get rid of data.loop and use coroutine APIs instead: while (!data.complete) { qemu_coroutine_yield(); } and update nbd_tls_handshake() to call aio_co_wake(data->co) instead of g_main_loop_quit(data->loop). This requires auditing the code to check whether the event loop might invoke something that interferes with nbd_negotiate_handle_starttls(). Typically this means monitor commands or fd activity that could change the state of this connection while it is yielded. This is where the real work is but hopefully it will not be that hard to figure out. > nbd_negotiate_options() > nbd_negotiate() > nbd_co_client_start() > coroutine_trampoline() > > Signed-off-by: zhuyangyang <zhuyangyan...@huawei.com> > --- > util/async.c | 13 ++++++++++++- > 1 file changed, 12 insertions(+), 1 deletion(-) > > diff --git a/util/async.c b/util/async.c > index 0467890052..25fc1e6083 100644 > --- a/util/async.c > +++ b/util/async.c > @@ -705,7 +705,18 @@ void aio_co_enter(AioContext *ctx, Coroutine *co) > if (qemu_in_coroutine()) { > Coroutine *self = qemu_coroutine_self(); > assert(self != co); > - QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next); > + /* > + * If the Coroutine *co is already in the co_queue_wakeup, this > + * repeated insertion will causes the loss of other queue element > + * or infinite loop. > + * For examplex: > + * Head->a->b->c->NULL, after insert_tail(head, b) => > Head->a->b->NULL > + * Head->a-b>->NULL, after insert_tail(head, b) => Head->a->b->b... > + */ > + if (!co->co_queue_next.sqe_next && > + self->co_queue_wakeup.sqh_last != &co->co_queue_next.sqe_next) { > + QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next); > + } > } else { > qemu_aio_coroutine_enter(ctx, co); > } > -- > 2.33.0 >
signature.asc
Description: PGP signature