On Wed, Nov 26, 2014 at 7:18 PM, Kevin Wolf <kw...@redhat.com> wrote: > Am 25.11.2014 um 08:23 hat Ming Lei geschrieben: >> In the submit path, we can't complete request directly, >> otherwise "Co-routine re-entered recursively" may be caused, >> so this patch fixes the issue with below ideas: >> >> - for -EAGAIN or partial completion, retry the submision >> in following completion cb which is run in BH context >> - for part of completion, update the io queue too >> - for case of io queue full, submit queued requests >> immediatelly and return failure to caller >> - for other failure, abort all queued requests in BH >> context, and requests won't be allow to submit until >> aborting is handled >> >> Reviewed-by: Paolo Bonzini <pbonz...@redhat.com> >> Signed-off-by: Ming Lei <ming....@canonical.com> > > This looks like a quite complex fix to this problem, and it introduces > new error cases, too (while aborting, requests fail now, but they really > should just be waiting).
"Co-routine re-entered recursively" is only triggered when io_submit() returns failure(either -EAGAIN or other error) or partial completion, so this patch actually is for handling failure of io_submit() and making linux-aio more reliably. After io queue is introduce, it is a bit easy to trigger the failure, especially in case of multi-queue, or VM driver sets a longer queue depth. So all cases(EAGAIN, other failure and partial completion) have to be considered too. So it doesn't mean a complex fix, and the actual problem is complex too. Not mention previously linux aio doesn't handle -EAGAIN. > > I'm wondering if this is the time to convert the linux-aio interface to > coroutines finally. It wouldn't only be a performance optimisation, but > would potentially also simplify this code. Even with coroutine, the above io_submit() issues has to be considered too. > >> block/linux-aio.c | 114 >> ++++++++++++++++++++++++++++++++++++++++++++--------- >> 1 file changed, 95 insertions(+), 19 deletions(-) >> >> diff --git a/block/linux-aio.c b/block/linux-aio.c >> index d92513b..11ac828 100644 >> --- a/block/linux-aio.c >> +++ b/block/linux-aio.c >> @@ -38,11 +38,20 @@ struct qemu_laiocb { >> QLIST_ENTRY(qemu_laiocb) node; >> }; >> >> +/* >> + * TODO: support to batch I/O from multiple bs in one same >> + * AIO context, one important use case is multi-lun scsi, >> + * so in future the IO queue should be per AIO context. >> + */ >> typedef struct { >> struct iocb *iocbs[MAX_QUEUED_IO]; >> int plugged; >> unsigned int size; >> unsigned int idx; >> + >> + /* abort queued requests in BH context */ >> + QEMUBH *abort_bh; >> + bool aborting; > > Two spaces. OK. > >> } LaioQueue; >> >> struct qemu_laio_state { >> @@ -59,6 +68,8 @@ struct qemu_laio_state { >> int event_max; >> }; >> >> +static int ioq_submit(struct qemu_laio_state *s); >> + >> static inline ssize_t io_event_ret(struct io_event *ev) >> { >> return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); >> @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct >> qemu_laio_state *s, >> qemu_aio_unref(laiocb); >> } >> >> +static void qemu_laio_start_retry(struct qemu_laio_state *s) >> +{ >> + if (s->io_q.idx) { >> + ioq_submit(s); >> + } >> +} >> + >> /* The completion BH fetches completed I/O requests and invokes their >> * callbacks. >> * >> @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque) >> >> qemu_laio_process_completion(s, laiocb); >> } >> + >> + qemu_laio_start_retry(s); >> } > > Why is qemu_laio_start_retry() a separate function? This is the only > caller. OK. > >> static void qemu_laio_completion_cb(EventNotifier *e) >> @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q) >> io_q->size = MAX_QUEUED_IO; >> io_q->idx = 0; >> io_q->plugged = 0; >> + io_q->aborting = false; >> } >> >> +/* Always return >= 0 and it means how many requests are submitted */ >> static int ioq_submit(struct qemu_laio_state *s) >> { >> - int ret, i = 0; >> + int ret; >> int len = s->io_q.idx; >> >> - do { >> - ret = io_submit(s->ctx, len, s->io_q.iocbs); >> - } while (i++ < 3 && ret == -EAGAIN); >> - >> - /* empty io queue */ >> - s->io_q.idx = 0; >> + if (!len) { >> + return 0; >> + } >> >> + ret = io_submit(s->ctx, len, s->io_q.iocbs); >> if (ret < 0) { >> - i = 0; >> - } else { >> - i = ret; >> + /* retry in following completion cb */ >> + if (ret == -EAGAIN) { >> + return 0; >> + } >> + >> + /* >> + * Abort in BH context for avoiding Co-routine re-entered, >> + * and update io queue at that time >> + */ >> + qemu_bh_schedule(s->io_q.abort_bh); >> + s->io_q.aborting = true; >> + ret = 0; >> } >> >> - for (; i < len; i++) { >> - struct qemu_laiocb *laiocb = >> - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); > >> + /* >> + * update io queue, and retry will be started automatically >> + * in following completion cb for the remainder >> + */ >> + if (ret > 0) { >> + if (ret < len) { >> + memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret], >> + (len - ret) * sizeof(struct iocb *)); >> + } >> + s->io_q.idx -= ret; >> + } > > Support for partly handled queues is nice, but a logically separate > change. Please move this to its own patch. As I described above, the issue of "Co-routine re-entered recursively" is triggered in current handling of partly completion. > >> - laiocb->ret = (ret < 0) ? ret : -EIO; >> + return ret; >> +} >> + >> +static void ioq_abort_bh(void *opaque) >> +{ >> + struct qemu_laio_state *s = opaque; >> + int i; >> + >> + for (i = 0; i < s->io_q.idx; i++) { >> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], >> + struct qemu_laiocb, >> + iocb); >> + laiocb->ret = -EIO; > > We shouldn't be throwing the real error code away. OK, will do that. > >> qemu_laio_process_completion(s, laiocb); >> } >> - return ret; >> + >> + s->io_q.idx = 0; >> + s->io_q.aborting = false; >> } >> >> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) >> { >> unsigned int idx = s->io_q.idx; >> >> + /* Request can't be allowed to submit until aborting is handled */ >> + if (unlikely(s->io_q.aborting)) { >> + return -EIO; >> + } >> + >> + if (unlikely(idx == s->io_q.size)) { >> + ioq_submit(s); >> + >> + if (unlikely(s->io_q.aborting)) { >> + return -EIO; >> + } >> + idx = s->io_q.idx; >> + } >> + >> + /* It has to return now if queue is still full */ >> + if (unlikely(idx == s->io_q.size)) { >> + return -EAGAIN; >> + } >> + >> s->io_q.iocbs[idx++] = iocb; >> s->io_q.idx = idx; >> >> - /* submit immediately if queue is full */ >> - if (idx == s->io_q.size) { >> + /* submit immediately if queue depth is above 2/3 */ >> + if (idx > s->io_q.size * 2 / 3) { >> ioq_submit(s); >> } > > This change looks independent as well. Also isn't mentioned in the > commit message. Why is it a good idea to use only 2/3 of the queue > instead of making use of its full length? When queue is full, the new submission has to return failure, so it is used for making linux-aio more reliable. Thanks, Ming Lei