On Tue, Aug 05, 2014 at 06:44:25PM +0800, Ming Lei wrote: > On Mon, Aug 4, 2014 at 11:56 PM, Stefan Hajnoczi <stefa...@redhat.com> wrote: > > If two Linux AIO request completions are fetched in the same > > io_getevents() call, QEMU will deadlock if request A's callback waits > > for request B to complete using an aio_poll() loop. This was reported > > to happen with the mirror blockjob. > > > > This patch moves completion processing into a BH and makes it resumable. > > Nested event loops can resume completion processing so that request B > > will complete and the deadlock will not occur. > > > > Cc: Kevin Wolf <kw...@redhat.com> > > Cc: Paolo Bonzini <pbonz...@redhat.com> > > Cc: Ming Lei <ming....@canonical.com> > > Cc: Marcin Gibuła <m.gib...@beyond.pl> > > Reported-by: Marcin Gibuła <m.gib...@beyond.pl> > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > > --- > > block/linux-aio.c | 71 > > ++++++++++++++++++++++++++++++++++++++++++------------- > > 1 file changed, 55 insertions(+), 16 deletions(-) > > > > diff --git a/block/linux-aio.c b/block/linux-aio.c > > index 7ac7e8c..9aca758 100644 > > --- a/block/linux-aio.c > > +++ b/block/linux-aio.c > > @@ -51,6 +51,12 @@ struct qemu_laio_state { > > > > /* io queue for submit at batch */ > > LaioQueue io_q; > > + > > + /* I/O completion processing */ > > + QEMUBH *completion_bh; > > + struct io_event events[MAX_EVENTS]; > > + int event_idx; > > + int event_max; > > }; > > > > static inline ssize_t io_event_ret(struct io_event *ev) > > @@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct > > qemu_laio_state *s, > > qemu_aio_release(laiocb); > > } > > > > -static void qemu_laio_completion_cb(EventNotifier *e) > > +/* The completion BH fetches completed I/O requests and invokes their > > + * callbacks. > > + * > > + * The function is somewhat tricky because it supports nested event loops, > > for > > + * example when a request callback invokes aio_poll(). In order to do > > this, > > Looks it is a very tricky usage, maybe it is better to change the caller.
This comment is not about usage. It's just for people reading the implementation. I can move it inside the function body, if you like. I like the idea of eliminating nested event loops, but it requires a huge change: making all callers either async (using callbacks) or coroutines so they can yield. There are many callers so this is a lot of work and will have side-effects too. BTW, here is the thread-pool.c fix which is analogous to this patch: https://lists.nongnu.org/archive/html/qemu-devel/2014-07/msg02437.html > > + * the completion events array and index are kept in qemu_laio_state. The > > BH > > + * reschedules itself as long as there are completions pending so it will > > + * either be called again in a nested event loop or will be called after > > all > > + * events have been completed. When there are no events left to complete, > > the > > + * BH returns without rescheduling. > > + */ > > +static void qemu_laio_completion_bh(void *opaque) > > { > > - struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); > > - > > - while (event_notifier_test_and_clear(&s->e)) { > > - struct io_event events[MAX_EVENTS]; > > - struct timespec ts = { 0 }; > > - int nevents, i; > > + struct qemu_laio_state *s = opaque; > > > > + /* Fetch more completion events when empty */ > > + if (s->event_idx == s->event_max) { > > do { > > - nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, > > &ts); > > - } while (nevents == -EINTR); > > + struct timespec ts = { 0 }; > > + s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, > > + s->events, &ts); > > + } while (s->event_max == -EINTR); > > + > > + s->event_idx = 0; > > + if (s->event_max <= 0) { > > + s->event_max = 0; > > + return; /* no more events */ > > + } > > + } > > > > - for (i = 0; i < nevents; i++) { > > - struct iocb *iocb = events[i].obj; > > - struct qemu_laiocb *laiocb = > > - container_of(iocb, struct qemu_laiocb, iocb); > > + /* Reschedule so nested event loops see currently pending completions > > */ > > + qemu_bh_schedule(s->completion_bh); > > > > - laiocb->ret = io_event_ret(&events[i]); > > - qemu_laio_process_completion(s, laiocb); > > - } > > + /* Process completion events */ > > + while (s->event_idx < s->event_max) { > > + struct iocb *iocb = s->events[s->event_idx].obj; > > + struct qemu_laiocb *laiocb = > > + container_of(iocb, struct qemu_laiocb, iocb); > > + > > + laiocb->ret = io_event_ret(&s->events[s->event_idx]); > > + s->event_idx++; > > + > > + qemu_laio_process_completion(s, laiocb); > > The implementation is same tricky with the usage, :-) > > Also using a FIFO style implementation should be more efficient > since IO events can still be read and completed in current BH handler > if the queue isn't full, but becomes more complicated. That might help but should be benchmarked. Another trick is calling qemu_laio_completion_bh() directly from qemu_laio_completion_cb() to avoid a BH iteration. I think they are premature optimization. Let's first agree whether this fix is correct or not :). Stefan
pgpDmhUL6P6aD.pgp
Description: PGP signature