> @@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque) > } > } > > +static void qemu_laio_start_retry(struct qemu_laio_state *s) > +{ > + if (s->io_q.idx) > + qemu_bh_schedule(s->io_q.retry); > +} > + > static void qemu_laio_completion_cb(EventNotifier *e) > { > struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); > @@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e) > if (event_notifier_test_and_clear(&s->e)) { > qemu_bh_schedule(s->completion_bh); > } > + qemu_laio_start_retry(s);
I think you do not even need two bottom halves. Just call ioq_submit from completion_bh instead, after the call to io_getevents. > } > > static void laio_cancel(BlockAIOCB *blockacb) > @@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb) > } > > laiocb->common.cb(laiocb->common.opaque, laiocb->ret); > + > + /* check if there are requests in io queue */ > + qemu_laio_start_retry(laiocb->ctx); > } > > static const AIOCBInfo laio_aiocb_info = { > @@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q) > io_q->plugged = 0; > } > > -static int ioq_submit(struct qemu_laio_state *s) > +static void abort_queue(struct qemu_laio_state *s) > +{ > + int i; > + for (i = 0; i < s->io_q.idx; i++) { > + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i], > + struct qemu_laiocb, > + iocb); > + laiocb->ret = -EIO; > + qemu_laio_process_completion(s, laiocb); > + } > +} > + > +static int ioq_submit(struct qemu_laio_state *s, bool enqueue) > { > int ret, i = 0; > int len = s->io_q.idx; > + int j = 0; > > - do { > - ret = io_submit(s->ctx, len, s->io_q.iocbs); > - } while (i++ < 3 && ret == -EAGAIN); > + if (!len) { > + return 0; > + } > > - /* empty io queue */ > - s->io_q.idx = 0; > + ret = io_submit(s->ctx, len, s->io_q.iocbs); > + if (ret == -EAGAIN) { /* retry in following completion cb */ > + return 0; > + } else if (ret < 0) { > + if (enqueue) { > + return ret; > + } > > - if (ret < 0) { > - i = 0; > - } else { > - i = ret; > + /* in non-queue path, all IOs have to be completed */ > + abort_queue(s); > + ret = len; > + } else if (ret == 0) { > + goto out; No need for goto; just move the "for" loop inside this conditional. Or better, just use memmove. That is: if (ret < 0) { if (ret == -EAGAIN) { return 0; } if (enqueue) { return ret; } abort_queue(s); ret = len; } if (ret > 0) { memmove(...) s->io_q.idx -= ret; } return ret; > + * update io queue, for partial completion, retry will be > + * started automatically in following completion cb. > + */ > + s->io_q.idx -= ret; > + > return ret; > } > > -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > +static void ioq_submit_retry(void *opaque) > +{ > + struct qemu_laio_state *s = opaque; > + ioq_submit(s, false); > +} > + > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > { > unsigned int idx = s->io_q.idx; > > + if (unlikely(idx == s->io_q.size)) { > + return -1; return -EAGAIN? Thanks, Paolo > + } > + > s->io_q.iocbs[idx++] = iocb; > s->io_q.idx = idx; > > - /* submit immediately if queue is full */ > - if (idx == s->io_q.size) { > - ioq_submit(s); > + /* submit immediately if queue depth is above 2/3 */ > + if (idx > s->io_q.size * 2 / 3) { > + return ioq_submit(s, true); > } > + > + return 0; > } > > void laio_io_plug(BlockDriverState *bs, void *aio_ctx) > @@ -237,7 +290,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, > bool unplug) > } > > if (s->io_q.idx > 0) { > - ret = ioq_submit(s); > + ret = ioq_submit(s, false); > } > > return ret; > @@ -281,7 +334,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void > *aio_ctx, int fd, > goto out_free_aiocb; > } > } else { > - ioq_enqueue(s, iocbs); > + if (ioq_enqueue(s, iocbs) < 0) { > + goto out_free_aiocb; > + } > } > return &laiocb->common; > > @@ -296,12 +351,14 @@ void laio_detach_aio_context(void *s_, AioContext > *old_context) > > aio_set_event_notifier(old_context, &s->e, NULL); > qemu_bh_delete(s->completion_bh); > + qemu_bh_delete(s->io_q.retry); > } > > void laio_attach_aio_context(void *s_, AioContext *new_context) > { > struct qemu_laio_state *s = s_; > > + s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s); > s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); > aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); > } >