On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kw...@redhat.com> wrote: > +/* > + * Adds a write request to the queue. > + */ > +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf, > + uint64_t size) > +{ > + BlockQueue *bq = context->bq; > + BlockQueueRequest *section_req; > + bool completed; > + > + /* Don't use the queue for writethrough images */ > + if ((bq->bs->open_flags & WRITEBACK_MODES) == 0) { > + return bdrv_pwrite(bq->bs, offset, buf, size); > + } > + > + /* First check if there are any pending writes for the same data. */ > + DPRINTF("-- pwrite: [%#lx + %ld]\n", offset, size); > + completed = blkqueue_check_queue_overlap(context, &bq->queue, &offset, > + &buf, &size, &blkqueue_pwrite, &pwrite_handle_overlap, > + context->section); > + > + if (completed) { > + return 0; > + } > + > + /* Create request structure */ > + BlockQueueRequest *req = qemu_malloc(sizeof(*req)); > + QLIST_INIT(&req->acbs); > + req->type = REQ_TYPE_WRITE; > + req->bq = bq; > + req->offset = offset; > + req->size = size; > + req->buf = qemu_malloc(size);
qemu_blockalign() > +static int insert_barrier(BlockQueueContext *context, BlockQueueAIOCB *acb) > +{ > + BlockQueue *bq = context->bq; > + BlockQueueRequest *section_req; > + > + bq->barriers_requested++; > + > + /* Create request structure */ > + BlockQueueRequest *req = qemu_malloc(sizeof(*req)); > + QLIST_INIT(&req->acbs); > + req->type = REQ_TYPE_BARRIER; > + req->bq = bq; > + req->section = context->section; > + req->buf = NULL; > + > + /* Find another barrier to merge with. */ > + QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) { > + if (section_req->section >= req->section) { > + > + /* > + * If acb is set, the intention of the barrier request is to > flush > + * the complete queue and notify the caller when all requests > have > + * been processed. To achieve this, we may only merge with the > very > + * last request in the queue. > + */ > + if (acb && QTAILQ_NEXT(section_req, link)) { > + continue; > + } > + > + req->section = section_req->section; > + context->section = section_req->section + 1; > + qemu_free(req); > + req = section_req; > + goto out; > + } > + } I think the search for an existing barrier should be moved above the req allocation. It's more work to allocate req, do an unnecessary req->section = section_req->section, and then free it again. > + > + /* > + * If there wasn't a barrier for the same section yet, insert a new one > at > + * the end. > + */ > + DPRINTF("queue-ins flush: %p\n", req); > + QTAILQ_INSERT_TAIL(&bq->queue, req, link); > + QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section); > + bq->queue_size++; > + context->section++; > + > + bq->barriers_submitted++; > + > + /* > + * At this point, req is either the newly inserted request, or a > previously > + * existing barrier with which the current request has been merged. > + * > + * Insert the ACB in the list of that request so that the callback is > + * called when the request has completed. > + */ > +out: > + if (acb) { > + QLIST_INSERT_HEAD(&req->acbs, acb, link); Is there a reason to insert at the head and not append to the tail? Preserving order is usually good. > +/* > + * If there are any blkqueue_aio_flush callbacks pending, call them with ret > + * as the error code and remove them from the queue. > + * > + * If keep_queue is false, all requests are removed from the queue > + */ > +static void blkqueue_fail_flush(BlockQueue *bq, int ret, bool keep_queue) > +{ > + BlockQueueRequest *req, *next_req; > + BlockQueueAIOCB *acb, *next_acb; > + > + QTAILQ_FOREACH_SAFE(req, &bq->queue, link, next_req) { > + > + /* Call and remove registered callbacks */ > + QLIST_FOREACH_SAFE(acb, &req->acbs, link, next_acb) { > + acb->common.cb(acb->common.opaque, ret); > + qemu_free(acb); qemu_aio_release() > + } > + QLIST_INIT(&req->acbs); > + > + /* If requested, remove the request itself */ > + if (!keep_queue) { > + QTAILQ_REMOVE(&bq->queue, req, link); bq->queue_size--; > + if (req->type == REQ_TYPE_BARRIER) { > + QSIMPLEQ_REMOVE(&bq->sections, req, BlockQueueRequest, > + link_section); > + } Now free the request? > + } > + } > + > + /* Make sure that blkqueue_flush stops running */ > + bq->flushing = ret; > +} > + > +static void blkqueue_process_request_cb(void *opaque, int ret) > +{ > + BlockQueueRequest *req = opaque; > + BlockQueue *bq = req->bq; > + BlockQueueAIOCB *acb, *next; > + > + DPRINTF(" done req: %p [%#lx + %ld]\n", req, req->offset, > req->size); > + > + /* Remove from in-flight list */ > + QTAILQ_REMOVE(&bq->in_flight, req, link); > + bq->in_flight_num--; > + > + /* > + * Error handling gets a bit complicated, because we have already > completed > + * the requests that went wrong. There are two ways of dealing with this: > + * > + * 1. With werror=stop we can put the request back into the queue and > stop > + * the VM. When the user continues the VM, the request is retried. > + * > + * 2. In other cases we need to return an error on the next bdrv_flush. > The > + * caller must cope with the fact that he doesn't know which of the > + * requests succeeded (i.e. invalidate all caches) > + * > + * If we're in an blkqueue_aio_flush, we must return an error in both > + * cases. If we stop the VM, we can clear bq->errno immediately again. > + * Otherwise, it's cleared in bdrv_(aio_)flush. > + */ > + if (ret < 0) { > + if (bq->error_ret != -ENOSPC) { > + bq->error_ret = ret; > + } > + } > + > + /* Call any callbacks attached to the request (see blkqueue_aio_flush) */ > + QLIST_FOREACH_SAFE(acb, &req->acbs, link, next) { > + acb->common.cb(acb->common.opaque, bq->error_ret); > + qemu_free(acb); qemu_aio_release() > + } > + QLIST_INIT(&req->acbs); > + > + /* Handle errors in the VM stop case */ > + if (ret < 0) { > + bool keep_queue = bq->error_handler(bq->error_opaque, ret); > + > + /* Fail any flushes that may wait for the queue to become empty */ > + blkqueue_fail_flush(bq, bq->error_ret, keep_queue); > + > + if (keep_queue) { > + /* Reinsert request into the queue */ > + QTAILQ_INSERT_HEAD(&bq->queue, req, link); > + if (req->type == REQ_TYPE_BARRIER) { > + QSIMPLEQ_INSERT_HEAD(&bq->sections, req, link_section); > + } > + > + /* Clear the error to restore a normal state after 'cont' */ > + bq->error_ret = 0; > + return; > + } > + } > + > + /* Cleanup */ > + blkqueue_free_request(req); > + > + /* Check if there are more requests to submit */ > + blkqueue_process_request(bq); > +} > + > +/* > + * Checks if the first request on the queue can run. If so, remove it from > the > + * queue, submit the request and put it onto the queue of in-flight requests. > + * > + * Returns 0 if a request has been submitted, -1 if no request can run or an > + * error has occurred. > + */ If we want specific error codes: -EAGAIN no request can run -EINVAL, -EIO, ... specific errors > +static int blkqueue_submit_request(BlockQueue *bq) > +{ > + BlockDriverAIOCB *acb; > + BlockQueueRequest *req; > + > + /* > + * If we had an error, we must not submit new requests from another > + * section or may we get ordering problems. In fact, not submitting any > new > + * requests looks like a good idea in this case. > + */ > + if (bq->error_ret) { > + return -1; > + } > + > + /* Fetch a request */ > + req = QTAILQ_FIRST(&bq->queue); > + if (req == NULL) { > + return -1; > + } > + > + /* Writethrough images aren't supposed to have any queue entries */ > + assert((bq->bs->open_flags & WRITEBACK_MODES) != 0); > + > + /* > + * We need to wait for completion before we can submit new requests: > + * 1. If we're currently processing a barrier, or the new request is a > + * barrier, we need to guarantee this barrier semantics. > + * 2. We must make sure that newer writes cannot pass older ones. > + */ > + if (bq->in_flight_num > 0) { I don't understand why we refuse to do more work on in_flight_num > 0. This function increments in_flight_num which means blkqueue_process_request() will only ever submit one request at a time? Why does blkqueue_process_request() loop then? I'm missing something. > + return -1; > + } > + > + /* Process barriers only if the queue is long enough */ > + if (!bq->flushing) { > + if (req->type == REQ_TYPE_BARRIER && bq->queue_size < 50) { > + return -1; > + } Not sure about this. Need to think about the patch more but this check looks like it could have consequences like starvation. I guess that's why you check for !bq->flushing. > +static void blkqueue_aio_cancel(BlockDriverAIOCB *blockacb) > +{ > + BlockQueueAIOCB *acb = (BlockQueueAIOCB*) blockacb; > + > + /* > + * We can't cancel the flush any more, but that doesn't hurt. We just > + * need to make sure that we don't call the callback when it completes. > + */ > + QLIST_REMOVE(acb, link); > + qemu_free(acb); qemu_aio_release() > +} > + > +/* > + * Inserts a barrier at the end of the queue (or merges with an existing > + * barrier there). Once the barrier has completed, the callback is called. > + */ > +BlockDriverAIOCB* blkqueue_aio_flush(BlockQueueContext *context, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + BlockQueueAIOCB *acb; > + BlockDriverState *bs = context->bq->bs; > + int ret; > + > + /* Don't use the queue for writethrough images */ > + if ((bs->open_flags & WRITEBACK_MODES) == 0) { > + return bdrv_aio_flush(bs, cb, opaque); > + } > + > + /* Insert a barrier into the queue */ > + acb = qemu_aio_get(&blkqueue_aio_pool, NULL, cb, opaque); > + > + ret = insert_barrier(context, acb); > + if (ret < 0) { This return path is broken: > + cb(opaque, ret); Missing BH. > + qemu_free(acb); qemu_aio_release(acb); If we want to invoke cb (via a BH) then we shouldn't release acb. If we do release it we need to return NULL but shouldn't invoke cb. > + } > + > + return &acb->common; > +} > + > +/* > + * Flushes the queue (i.e. disables waiting for new requests to be batched) > and > + * waits until all requests in the queue have completed. > + * > + * Note that unlike blkqueue_aio_flush this does not call bdrv_flush(). > + */ > +int blkqueue_flush(BlockQueue *bq) > +{ > + int res = 0; > + > + bq->flushing = 1; > + > + /* Process any left over requests */ > + while ((bq->flushing > 0) && > + (bq->in_flight_num || QTAILQ_FIRST(&bq->queue))) > + { > + blkqueue_process_request(bq); > + qemu_aio_wait(); > + } > + > + /* > + * bq->flushing contains the error if it could be handled by stopping the > + * VM, error_ret contains it if we're not allowed to do this. > + */ > + if (bq->error_ret < 0) { > + res = bq->error_ret; > + > + /* > + * Wait for AIO requests, so that the queue is really unused after > + * blkqueue_flush() and the caller can destroy it > + */ > + if (res < 0) { We already know bq->error_ret < 0. This comparison is always true. Stefan