On Wed, Jul 18, 2012 at 04:07:38PM +0100, Stefan Hajnoczi wrote: > RHEL6 and other new guest kernels use indirect vring descriptors to > increase the number of requests that can be batched. This fundamentally > changes vring from a scheme that requires fixed resources to something > more dynamic (although there is still an absolute maximum number of > descriptors). Cope with indirect vrings by taking on as many requests > as we can in one go and then postponing the remaining requests until the > first batch completes. > > It would be possible to switch to dynamic resource management so iovec > and iocb structs are malloced. This would allow the entire ring to be > processed even with indirect descriptors, but would probably hit a > bottleneck when io_submit refuses to queue more requests. Therefore, > stick with the simpler scheme for now. > > Unfortunately Linux AIO does not support asynchronous fsync/fdatasync on > all files. In particular, an O_DIRECT opened file on ext4 does not > support Linux AIO fdsync. Work around this by performing fdatasync() > synchronously for now. > > Signed-off-by: Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> > --- > hw/dataplane/ioq.h | 18 ++++----- > hw/dataplane/vring.h | 103 > +++++++++++++++++++++++++++++++++++++++++++------- > hw/virtio-blk.c | 75 ++++++++++++++++++++++-------------- > 3 files changed, 144 insertions(+), 52 deletions(-) > > diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h > index 7200e87..d1545d6 100644 > --- a/hw/dataplane/ioq.h > +++ b/hw/dataplane/ioq.h > @@ -3,7 +3,7 @@ > > typedef struct { > int fd; /* file descriptor */ > - unsigned int max_reqs; /* max length of freelist and queue */ > + unsigned int max_reqs; /* max length of freelist and queue */ > > io_context_t io_ctx; /* Linux AIO context */ > EventNotifier io_notifier; /* Linux AIO eventfd */ > @@ -91,18 +91,16 @@ static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, > struct iovec *iov, unsigne > return iocb; > } > > -static struct iocb *ioq_fdsync(IOQueue *ioq) > -{ > - struct iocb *iocb = ioq_get_iocb(ioq); > - > - io_prep_fdsync(iocb, ioq->fd); > - io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier)); > - return iocb; > -} > - > static int ioq_submit(IOQueue *ioq) > { > int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue); > + if (unlikely(rc < 0)) { > + unsigned int i; > + fprintf(stderr, "io_submit io_ctx=%#lx nr=%d iovecs=%p\n", > (uint64_t)ioq->io_ctx, ioq->queue_idx, ioq->queue); > + for (i = 0; i < ioq->queue_idx; i++) { > + fprintf(stderr, "[%u] type=%#x fd=%d\n", i, > ioq->queue[i]->aio_lio_opcode, ioq->queue[i]->aio_fildes); > + } > + } > ioq->queue_idx = 0; /* reset */ > return rc; > } > diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h > index 70675e5..3eab4b4 100644 > --- a/hw/dataplane/vring.h > +++ b/hw/dataplane/vring.h > @@ -64,6 +64,86 @@ static void vring_setup(Vring *vring, VirtIODevice *vdev, > int n) > vring->vr.desc, vring->vr.avail, vring->vr.used); > } > > +static bool vring_more_avail(Vring *vring) > +{ > + return vring->vr.avail->idx != vring->last_avail_idx; > +} > + > +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
So add a Red Hat copyright pls. > +static bool get_indirect(Vring *vring, > + struct iovec iov[], struct iovec *iov_end, > + unsigned int *out_num, unsigned int *in_num, > + struct vring_desc *indirect) > +{ > + struct vring_desc desc; > + unsigned int i = 0, count, found = 0; > + > + /* Sanity check */ > + if (unlikely(indirect->len % sizeof desc)) { > + fprintf(stderr, "Invalid length in indirect descriptor: " > + "len 0x%llx not multiple of 0x%zx\n", > + (unsigned long long)indirect->len, > + sizeof desc); > + exit(1); > + } > + > + count = indirect->len / sizeof desc; > + /* Buffers are chained via a 16 bit next field, so > + * we can have at most 2^16 of these. */ > + if (unlikely(count > USHRT_MAX + 1)) { > + fprintf(stderr, "Indirect buffer length too big: %d\n", > + indirect->len); > + exit(1); > + } > + > + /* Point to translate indirect desc chain */ > + indirect = phys_to_host(vring, indirect->addr); > + > + /* We will use the result as an address to read from, so most > + * architectures only need a compiler barrier here. */ > + __sync_synchronize(); /* read_barrier_depends(); */ > + > + do { > + if (unlikely(++found > count)) { > + fprintf(stderr, "Loop detected: last one at %u " > + "indirect size %u\n", > + i, count); > + exit(1); > + } > + > + desc = *indirect++; > + if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) { > + fprintf(stderr, "Nested indirect descriptor\n"); > + exit(1); > + } > + > + /* Stop for now if there are not enough iovecs available. */ > + if (iov >= iov_end) { > + return false; > + } > + > + iov->iov_base = phys_to_host(vring, desc.addr); > + iov->iov_len = desc.len; > + iov++; > + > + /* If this is an input descriptor, increment that count. */ > + if (desc.flags & VRING_DESC_F_WRITE) { > + *in_num += 1; > + } else { > + /* If it's an output descriptor, they're all supposed > + * to come before any input descriptors. */ > + if (unlikely(*in_num)) { > + fprintf(stderr, "Indirect descriptor " > + "has out after in: idx %d\n", i); > + exit(1); > + } > + *out_num += 1; > + } > + i = desc.next; > + } while (desc.flags & VRING_DESC_F_NEXT); > + return true; > +} > + > /* This looks in the virtqueue and for the first available buffer, and > converts > * it to an iovec for convenient access. Since descriptors consist of some > * number of output then some number of input descriptors, it's actually two > @@ -129,23 +209,20 @@ static unsigned int vring_pop(Vring *vring, > } > desc = vring->vr.desc[i]; > if (desc.flags & VRING_DESC_F_INDIRECT) { > -/* ret = get_indirect(dev, vq, iov, iov_size, > - out_num, in_num, > - log, log_num, &desc); > - if (unlikely(ret < 0)) { > - vq_err(vq, "Failure detected " > - "in indirect descriptor at idx %d\n", i); > - return ret; > - } > - continue; */ > - fprintf(stderr, "Indirect vring not supported\n"); > - exit(1); > + if (!get_indirect(vring, iov, iov_end, out_num, in_num, > &desc)) { > + return num; /* not enough iovecs, stop for now */ > + } > + continue; > } > > + /* If there are not enough iovecs left, stop for now. The caller > + * should check if there are more descs available once they have > dealt > + * with the current set. > + */ > if (iov >= iov_end) { > - fprintf(stderr, "Not enough vring iovecs\n"); > - exit(1); > + return num; > } > + > iov->iov_base = phys_to_host(vring, desc.addr); > iov->iov_len = desc.len; > iov++; > diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c > index 52ea601..591eace 100644 > --- a/hw/virtio-blk.c > +++ b/hw/virtio-blk.c > @@ -62,6 +62,14 @@ static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev) > return (VirtIOBlock *)vdev; > } > > +/* Normally the block driver passes down the fd, there's no way to get it > from > + * above. > + */ > +static int get_raw_posix_fd_hack(VirtIOBlock *s) > +{ > + return *(int*)s->bs->file->opaque; > +} > + > static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque) > { > VirtIOBlock *s = opaque; > @@ -83,18 +91,6 @@ static void complete_request(struct iocb *iocb, ssize_t > ret, void *opaque) > vring_push(&s->vring, req->head, len + sizeof req->status); > } > > -static bool handle_io(EventHandler *handler) > -{ > - VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler); > - > - if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) { > - /* TODO is this thread-safe and can it be done faster? */ > - virtio_irq(s->vq); > - } > - > - return true; > -} > - > static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int > out_num, unsigned int in_num, unsigned int head) > { > /* Virtio block requests look like this: */ > @@ -117,13 +113,16 @@ static void process_request(IOQueue *ioq, struct iovec > iov[], unsigned int out_n > outhdr->type, outhdr->sector); > */ > > - if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) { > + /* TODO Linux sets the barrier bit even when not advertised! */ > + uint32_t type = outhdr->type & ~VIRTIO_BLK_T_BARRIER; > + > + if (unlikely(type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) { > fprintf(stderr, "virtio-blk unsupported request type %#x\n", > outhdr->type); > exit(1); > } > > struct iocb *iocb; > - switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) { > + switch (type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) { > case VIRTIO_BLK_T_IN: > if (unlikely(out_num != 1)) { > fprintf(stderr, "virtio-blk invalid read request\n"); > @@ -145,8 +144,16 @@ static void process_request(IOQueue *ioq, struct iovec > iov[], unsigned int out_n > fprintf(stderr, "virtio-blk invalid flush request\n"); > exit(1); > } > - iocb = ioq_fdsync(ioq); > - break; > + > + /* TODO fdsync is not supported by all backends, do it synchronously > here! */ > + { > + VirtIOBlock *s = container_of(ioq, VirtIOBlock, ioqueue); > + fdatasync(get_raw_posix_fd_hack(s)); > + inhdr->status = VIRTIO_BLK_S_OK; > + vring_push(&s->vring, head, sizeof *inhdr); > + virtio_irq(s->vq); > + } > + return; > > default: > fprintf(stderr, "virtio-blk multiple request type bits set\n"); > @@ -199,11 +206,29 @@ static bool handle_notify(EventHandler *handler) > } > > /* Submit requests, if any */ > - if (likely(iov != iovec)) { > - if (unlikely(ioq_submit(&s->ioqueue) < 0)) { > - fprintf(stderr, "ioq_submit failed\n"); > - exit(1); > - } > + int rc = ioq_submit(&s->ioqueue); > + if (unlikely(rc < 0)) { > + fprintf(stderr, "ioq_submit failed %d\n", rc); > + exit(1); > + } > + return true; > +} > + > +static bool handle_io(EventHandler *handler) > +{ > + VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler); > + > + if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) { > + /* TODO is this thread-safe and can it be done faster? */ > + virtio_irq(s->vq); > + } > + > + /* If there were more requests than iovecs, the vring will not be empty > yet > + * so check again. There should now be enough resources to process more > + * requests. > + */ > + if (vring_more_avail(&s->vring)) { > + return handle_notify(&s->notify_handler); > } > > return true; > @@ -217,14 +242,6 @@ static void *data_plane_thread(void *opaque) > return NULL; > } > > -/* Normally the block driver passes down the fd, there's no way to get it > from > - * above. > - */ > -static int get_raw_posix_fd_hack(VirtIOBlock *s) > -{ > - return *(int*)s->bs->file->opaque; > -} > - > static void data_plane_start(VirtIOBlock *s) > { > int i; > -- > 1.7.10.4