On Tue, Aug 05, 2014 at 06:44:25PM +0800, Ming Lei wrote:
> On Mon, Aug 4, 2014 at 11:56 PM, Stefan Hajnoczi <stefa...@redhat.com> wrote:
> > If two Linux AIO request completions are fetched in the same
> > io_getevents() call, QEMU will deadlock if request A's callback waits
> > for request B to complete using an aio_poll() loop.  This was reported
> > to happen with the mirror blockjob.
> >
> > This patch moves completion processing into a BH and makes it resumable.
> > Nested event loops can resume completion processing so that request B
> > will complete and the deadlock will not occur.
> >
> > Cc: Kevin Wolf <kw...@redhat.com>
> > Cc: Paolo Bonzini <pbonz...@redhat.com>
> > Cc: Ming Lei <ming....@canonical.com>
> > Cc: Marcin Gibuła <m.gib...@beyond.pl>
> > Reported-by: Marcin Gibuła <m.gib...@beyond.pl>
> > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
> > ---
> >  block/linux-aio.c | 71 
> > ++++++++++++++++++++++++++++++++++++++++++-------------
> >  1 file changed, 55 insertions(+), 16 deletions(-)
> >
> > diff --git a/block/linux-aio.c b/block/linux-aio.c
> > index 7ac7e8c..9aca758 100644
> > --- a/block/linux-aio.c
> > +++ b/block/linux-aio.c
> > @@ -51,6 +51,12 @@ struct qemu_laio_state {
> >
> >      /* io queue for submit at batch */
> >      LaioQueue io_q;
> > +
> > +    /* I/O completion processing */
> > +    QEMUBH *completion_bh;
> > +    struct io_event events[MAX_EVENTS];
> > +    int event_idx;
> > +    int event_max;
> >  };
> >
> >  static inline ssize_t io_event_ret(struct io_event *ev)
> > @@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct 
> > qemu_laio_state *s,
> >      qemu_aio_release(laiocb);
> >  }
> >
> > -static void qemu_laio_completion_cb(EventNotifier *e)
> > +/* The completion BH fetches completed I/O requests and invokes their
> > + * callbacks.
> > + *
> > + * The function is somewhat tricky because it supports nested event loops, 
> > for
> > + * example when a request callback invokes aio_poll().  In order to do 
> > this,
> 
> Looks it is a very tricky usage, maybe it is better to change the caller.

This comment is not about usage.  It's just for people reading the
implementation.  I can move it inside the function body, if you like.

I like the idea of eliminating nested event loops, but it requires a
huge change: making all callers either async (using callbacks) or
coroutines so they can yield.

There are many callers so this is a lot of work and will have
side-effects too.

BTW, here is the thread-pool.c fix which is analogous to this patch:
https://lists.nongnu.org/archive/html/qemu-devel/2014-07/msg02437.html

> > + * the completion events array and index are kept in qemu_laio_state.  The 
> > BH
> > + * reschedules itself as long as there are completions pending so it will
> > + * either be called again in a nested event loop or will be called after 
> > all
> > + * events have been completed.  When there are no events left to complete, 
> > the
> > + * BH returns without rescheduling.
> > + */
> > +static void qemu_laio_completion_bh(void *opaque)
> >  {
> > -    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
> > -
> > -    while (event_notifier_test_and_clear(&s->e)) {
> > -        struct io_event events[MAX_EVENTS];
> > -        struct timespec ts = { 0 };
> > -        int nevents, i;
> > +    struct qemu_laio_state *s = opaque;
> >
> > +    /* Fetch more completion events when empty */
> > +    if (s->event_idx == s->event_max) {
> >          do {
> > -            nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, 
> > &ts);
> > -        } while (nevents == -EINTR);
> > +            struct timespec ts = { 0 };
> > +            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
> > +                                        s->events, &ts);
> > +        } while (s->event_max == -EINTR);
> > +
> > +        s->event_idx = 0;
> > +        if (s->event_max <= 0) {
> > +            s->event_max = 0;
> > +            return; /* no more events */
> > +        }
> > +    }
> >
> > -        for (i = 0; i < nevents; i++) {
> > -            struct iocb *iocb = events[i].obj;
> > -            struct qemu_laiocb *laiocb =
> > -                    container_of(iocb, struct qemu_laiocb, iocb);
> > +    /* Reschedule so nested event loops see currently pending completions 
> > */
> > +    qemu_bh_schedule(s->completion_bh);
> >
> > -            laiocb->ret = io_event_ret(&events[i]);
> > -            qemu_laio_process_completion(s, laiocb);
> > -        }
> > +    /* Process completion events */
> > +    while (s->event_idx < s->event_max) {
> > +        struct iocb *iocb = s->events[s->event_idx].obj;
> > +        struct qemu_laiocb *laiocb =
> > +                container_of(iocb, struct qemu_laiocb, iocb);
> > +
> > +        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
> > +        s->event_idx++;
> > +
> > +        qemu_laio_process_completion(s, laiocb);
> 
> The implementation is same tricky with the usage, :-)
> 
> Also using a FIFO style implementation should be more efficient
> since IO events can still be read and completed in current BH handler
> if the queue isn't full, but becomes more complicated.

That might help but should be benchmarked.

Another trick is calling qemu_laio_completion_bh() directly from
qemu_laio_completion_cb() to avoid a BH iteration.

I think they are premature optimization.  Let's first agree whether this
fix is correct or not :).

Stefan

Attachment: pgpDmhUL6P6aD.pgp
Description: PGP signature

Reply via email to