Also for Linux AIO, don't call callbacks that don't belong to the active AsyncContext.
Signed-off-by: Kevin Wolf <kw...@redhat.com> --- linux-aio.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 72 insertions(+), 15 deletions(-) diff --git a/linux-aio.c b/linux-aio.c index 19ad36d..5e892b0 100644 --- a/linux-aio.c +++ b/linux-aio.c @@ -31,12 +31,15 @@ struct qemu_laiocb { struct iocb iocb; ssize_t ret; size_t nbytes; + int async_context_id; + QLIST_ENTRY(qemu_laiocb) node; }; struct qemu_laio_state { io_context_t ctx; int efd; int count; + QLIST_HEAD(, qemu_laiocb) completed_reqs; }; static inline ssize_t io_event_ret(struct io_event *ev) @@ -44,6 +47,69 @@ static inline ssize_t io_event_ret(struct io_event *ev) return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); } +/* + * Completes an AIO request (calls the callback and frees the ACB). + * Be sure to be in the right AsyncContext before calling this function. + */ +static void qemu_laio_process_completion(struct qemu_laio_state *s, + struct qemu_laiocb *laiocb) +{ + int ret; + + s->count--; + + ret = laiocb->ret; + if (ret != -ECANCELED) { + if (ret == laiocb->nbytes) + ret = 0; + else if (ret >= 0) + ret = -EINVAL; + + laiocb->common.cb(laiocb->common.opaque, ret); + } + + qemu_aio_release(laiocb); +} + +/* + * Processes all queued AIO requests, i.e. requests that have return from OS + * but their callback was not called yet. Requests that cannot have their + * callback called in the current AsyncContext, remain in the queue. + * + * Returns 1 if at least one request could be completed, 0 otherwise. + */ +static int qemu_laio_process_requests(void *opaque) +{ + struct qemu_laio_state *s = opaque; + struct qemu_laiocb *laiocb, *next; + int res = 0; + + QLIST_FOREACH_SAFE (laiocb, &s->completed_reqs, node, next) { + if (laiocb->async_context_id == get_async_context_id()) { + qemu_laio_process_completion(s, laiocb); + QLIST_REMOVE(laiocb, node); + res = 1; + } + } + + return res; +} + +/* + * Puts a request in the completion queue so that its callback is called the + * next time when it's possible. If we already are in the right AsyncContext, + * the request is completed immediately instead. + */ +static void qemu_laio_enqueue_completed(struct qemu_laio_state *s, + struct qemu_laiocb* laiocb) +{ + if (laiocb->async_context_id == get_async_context_id()) { + qemu_laio_process_completion(s, laiocb); + } else { + QLIST_INSERT_HEAD(&s->completed_reqs, laiocb, node); + } +} + static void qemu_laio_completion_cb(void *opaque) { struct qemu_laio_state *s = opaque; @@ -74,19 +140,8 @@ static void qemu_laio_completion_cb(void *opaque) struct qemu_laiocb *laiocb = container_of(iocb, struct qemu_laiocb, iocb); - s->count--; - - ret = laiocb->ret = io_event_ret(&events[i]); - if (ret != -ECANCELED) { - if (ret == laiocb->nbytes) - ret = 0; - else if (ret >= 0) - ret = -EINVAL; - - laiocb->common.cb(laiocb->common.opaque, ret); - } - - qemu_aio_release(laiocb); + laiocb->ret = io_event_ret(&events[i]); + qemu_laio_enqueue_completed(s, laiocb); } } } @@ -149,6 +204,7 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, laiocb->nbytes = nb_sectors * 512; laiocb->ctx = s; laiocb->ret = -EINPROGRESS; + laiocb->async_context_id = get_async_context_id(); iocbs = &laiocb->iocb; @@ -183,6 +239,7 @@ void *laio_init(void) struct qemu_laio_state *s; s = qemu_mallocz(sizeof(*s)); + QLIST_INIT(&s->completed_reqs); s->efd = eventfd(0, 0); if (s->efd == -1) goto out_free_state; @@ -191,8 +248,8 @@ void *laio_init(void) if (io_setup(MAX_EVENTS, &s->ctx) != 0) goto out_close_efd; - qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, - NULL, qemu_laio_flush_cb, NULL, s); + qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL, + qemu_laio_flush_cb, qemu_laio_process_requests, s); return s; -- 1.6.2.5