On Wed, Jul 18, 2012 at 04:07:38PM +0100, Stefan Hajnoczi wrote:
> RHEL6 and other new guest kernels use indirect vring descriptors to
> increase the number of requests that can be batched.  This fundamentally
> changes vring from a scheme that requires fixed resources to something
> more dynamic (although there is still an absolute maximum number of
> descriptors).  Cope with indirect vrings by taking on as many requests
> as we can in one go and then postponing the remaining requests until the
> first batch completes.
> 
> It would be possible to switch to dynamic resource management so iovec
> and iocb structs are malloced.  This would allow the entire ring to be
> processed even with indirect descriptors, but would probably hit a
> bottleneck when io_submit refuses to queue more requests.  Therefore,
> stick with the simpler scheme for now.
> 
> Unfortunately Linux AIO does not support asynchronous fsync/fdatasync on
> all files.  In particular, an O_DIRECT opened file on ext4 does not
> support Linux AIO fdsync.  Work around this by performing fdatasync()
> synchronously for now.
> 
> Signed-off-by: Stefan Hajnoczi <stefa...@linux.vnet.ibm.com>
> ---
>  hw/dataplane/ioq.h   |   18 ++++-----
>  hw/dataplane/vring.h |  103 
> +++++++++++++++++++++++++++++++++++++++++++-------
>  hw/virtio-blk.c      |   75 ++++++++++++++++++++++--------------
>  3 files changed, 144 insertions(+), 52 deletions(-)
> 
> diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
> index 7200e87..d1545d6 100644
> --- a/hw/dataplane/ioq.h
> +++ b/hw/dataplane/ioq.h
> @@ -3,7 +3,7 @@
>  
>  typedef struct {
>      int fd;                         /* file descriptor */
> -    unsigned int max_reqs;           /* max length of freelist and queue */
> +    unsigned int max_reqs;          /* max length of freelist and queue */
>  
>      io_context_t io_ctx;            /* Linux AIO context */
>      EventNotifier io_notifier;      /* Linux AIO eventfd */
> @@ -91,18 +91,16 @@ static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, 
> struct iovec *iov, unsigne
>      return iocb;
>  }
>  
> -static struct iocb *ioq_fdsync(IOQueue *ioq)
> -{
> -    struct iocb *iocb = ioq_get_iocb(ioq);
> -
> -    io_prep_fdsync(iocb, ioq->fd);
> -    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
> -    return iocb;
> -}
> -
>  static int ioq_submit(IOQueue *ioq)
>  {
>      int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
> +    if (unlikely(rc < 0)) {
> +        unsigned int i;
> +        fprintf(stderr, "io_submit io_ctx=%#lx nr=%d iovecs=%p\n", 
> (uint64_t)ioq->io_ctx, ioq->queue_idx, ioq->queue);
> +        for (i = 0; i < ioq->queue_idx; i++) {
> +            fprintf(stderr, "[%u] type=%#x fd=%d\n", i, 
> ioq->queue[i]->aio_lio_opcode, ioq->queue[i]->aio_fildes);
> +        }
> +    }
>      ioq->queue_idx = 0; /* reset */
>      return rc;
>  }
> diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
> index 70675e5..3eab4b4 100644
> --- a/hw/dataplane/vring.h
> +++ b/hw/dataplane/vring.h
> @@ -64,6 +64,86 @@ static void vring_setup(Vring *vring, VirtIODevice *vdev, 
> int n)
>              vring->vr.desc, vring->vr.avail, vring->vr.used);
>  }
>  
> +static bool vring_more_avail(Vring *vring)
> +{
> +     return vring->vr.avail->idx != vring->last_avail_idx;
> +}
> +
> +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
> +static bool get_indirect(Vring *vring,
> +                     struct iovec iov[], struct iovec *iov_end,
> +                     unsigned int *out_num, unsigned int *in_num,
> +                     struct vring_desc *indirect)
> +{
> +     struct vring_desc desc;
> +     unsigned int i = 0, count, found = 0;
> +
> +     /* Sanity check */
> +     if (unlikely(indirect->len % sizeof desc)) {
> +             fprintf(stderr, "Invalid length in indirect descriptor: "
> +                    "len 0x%llx not multiple of 0x%zx\n",
> +                    (unsigned long long)indirect->len,
> +                    sizeof desc);
> +             exit(1);
> +     }
> +
> +     count = indirect->len / sizeof desc;
> +     /* Buffers are chained via a 16 bit next field, so
> +      * we can have at most 2^16 of these. */
> +     if (unlikely(count > USHRT_MAX + 1)) {
> +             fprintf(stderr, "Indirect buffer length too big: %d\n",
> +                    indirect->len);
> +        exit(1);
> +     }
> +
> +    /* Point to translate indirect desc chain */
> +    indirect = phys_to_host(vring, indirect->addr);
> +
> +     /* We will use the result as an address to read from, so most
> +      * architectures only need a compiler barrier here. */
> +     __sync_synchronize(); /* read_barrier_depends(); */


qemu has its own barriers now, pls use them.

> +
> +     do {
> +             if (unlikely(++found > count)) {
> +                     fprintf(stderr, "Loop detected: last one at %u "
> +                            "indirect size %u\n",
> +                            i, count);
> +                     exit(1);
> +             }
> +
> +        desc = *indirect++;
> +             if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
> +                     fprintf(stderr, "Nested indirect descriptor\n");
> +            exit(1);
> +             }
> +
> +        /* Stop for now if there are not enough iovecs available. */
> +        if (iov >= iov_end) {
> +            return false;
> +        }
> +
> +        iov->iov_base = phys_to_host(vring, desc.addr);
> +        iov->iov_len  = desc.len;
> +        iov++;
> +
> +             /* If this is an input descriptor, increment that count. */
> +             if (desc.flags & VRING_DESC_F_WRITE) {
> +                     *in_num += 1;
> +             } else {
> +                     /* If it's an output descriptor, they're all supposed
> +                      * to come before any input descriptors. */
> +                     if (unlikely(*in_num)) {
> +                             fprintf(stderr, "Indirect descriptor "
> +                                    "has out after in: idx %d\n", i);
> +                exit(1);
> +                     }
> +                     *out_num += 1;
> +             }
> +        i = desc.next;
> +     } while (desc.flags & VRING_DESC_F_NEXT);
> +    return true;
> +}
> +
>  /* This looks in the virtqueue and for the first available buffer, and 
> converts
>   * it to an iovec for convenient access.  Since descriptors consist of some
>   * number of output then some number of input descriptors, it's actually two
> @@ -129,23 +209,20 @@ static unsigned int vring_pop(Vring *vring,
>               }
>          desc = vring->vr.desc[i];
>               if (desc.flags & VRING_DESC_F_INDIRECT) {
> -/*                   ret = get_indirect(dev, vq, iov, iov_size,
> -                                        out_num, in_num,
> -                                        log, log_num, &desc);
> -                     if (unlikely(ret < 0)) {
> -                             vq_err(vq, "Failure detected "
> -                                    "in indirect descriptor at idx %d\n", i);
> -                             return ret;
> -                     }
> -                     continue; */
> -            fprintf(stderr, "Indirect vring not supported\n");
> -            exit(1);
> +                     if (!get_indirect(vring, iov, iov_end, out_num, in_num, 
> &desc)) {
> +                return num; /* not enough iovecs, stop for now */
> +            }
> +            continue;
>               }
>  
> +        /* If there are not enough iovecs left, stop for now.  The caller
> +         * should check if there are more descs available once they have 
> dealt
> +         * with the current set.
> +         */
>          if (iov >= iov_end) {
> -            fprintf(stderr, "Not enough vring iovecs\n");
> -            exit(1);
> +            return num;
>          }
> +
>          iov->iov_base = phys_to_host(vring, desc.addr);
>          iov->iov_len  = desc.len;
>          iov++;
> diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c
> index 52ea601..591eace 100644
> --- a/hw/virtio-blk.c
> +++ b/hw/virtio-blk.c
> @@ -62,6 +62,14 @@ static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
>      return (VirtIOBlock *)vdev;
>  }
>  
> +/* Normally the block driver passes down the fd, there's no way to get it 
> from
> + * above.
> + */
> +static int get_raw_posix_fd_hack(VirtIOBlock *s)
> +{
> +    return *(int*)s->bs->file->opaque;
> +}
> +
>  static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
>  {
>      VirtIOBlock *s = opaque;
> @@ -83,18 +91,6 @@ static void complete_request(struct iocb *iocb, ssize_t 
> ret, void *opaque)
>      vring_push(&s->vring, req->head, len + sizeof req->status);
>  }
>  
> -static bool handle_io(EventHandler *handler)
> -{
> -    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
> -
> -    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
> -        /* TODO is this thread-safe and can it be done faster? */
> -        virtio_irq(s->vq);
> -    }
> -
> -    return true;
> -}
> -
>  static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int 
> out_num, unsigned int in_num, unsigned int head)
>  {
>      /* Virtio block requests look like this: */
> @@ -117,13 +113,16 @@ static void process_request(IOQueue *ioq, struct iovec 
> iov[], unsigned int out_n
>              outhdr->type, outhdr->sector);
>      */
>  
> -    if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
> +    /* TODO Linux sets the barrier bit even when not advertised! */
> +    uint32_t type = outhdr->type & ~VIRTIO_BLK_T_BARRIER;
> +
> +    if (unlikely(type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
>          fprintf(stderr, "virtio-blk unsupported request type %#x\n", 
> outhdr->type);
>          exit(1);
>      }
>  
>      struct iocb *iocb;
> -    switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
> +    switch (type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
>      case VIRTIO_BLK_T_IN:
>          if (unlikely(out_num != 1)) {
>              fprintf(stderr, "virtio-blk invalid read request\n");
> @@ -145,8 +144,16 @@ static void process_request(IOQueue *ioq, struct iovec 
> iov[], unsigned int out_n
>              fprintf(stderr, "virtio-blk invalid flush request\n");
>              exit(1);
>          }
> -        iocb = ioq_fdsync(ioq);
> -        break;
> +
> +        /* TODO fdsync is not supported by all backends, do it synchronously 
> here! */
> +        {
> +            VirtIOBlock *s = container_of(ioq, VirtIOBlock, ioqueue);
> +            fdatasync(get_raw_posix_fd_hack(s));
> +            inhdr->status = VIRTIO_BLK_S_OK;
> +            vring_push(&s->vring, head, sizeof *inhdr);
> +            virtio_irq(s->vq);
> +        }
> +        return;
>  
>      default:
>          fprintf(stderr, "virtio-blk multiple request type bits set\n");
> @@ -199,11 +206,29 @@ static bool handle_notify(EventHandler *handler)
>      }
>  
>      /* Submit requests, if any */
> -    if (likely(iov != iovec)) {
> -        if (unlikely(ioq_submit(&s->ioqueue) < 0)) {
> -            fprintf(stderr, "ioq_submit failed\n");
> -            exit(1);
> -        }
> +    int rc = ioq_submit(&s->ioqueue);
> +    if (unlikely(rc < 0)) {
> +        fprintf(stderr, "ioq_submit failed %d\n", rc);
> +        exit(1);
> +    }
> +    return true;
> +}
> +
> +static bool handle_io(EventHandler *handler)
> +{
> +    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
> +
> +    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
> +        /* TODO is this thread-safe and can it be done faster? */
> +        virtio_irq(s->vq);
> +    }
> +
> +    /* If there were more requests than iovecs, the vring will not be empty 
> yet
> +     * so check again.  There should now be enough resources to process more
> +     * requests.
> +     */
> +    if (vring_more_avail(&s->vring)) {
> +        return handle_notify(&s->notify_handler);
>      }
>  
>      return true;
> @@ -217,14 +242,6 @@ static void *data_plane_thread(void *opaque)
>      return NULL;
>  }
>  
> -/* Normally the block driver passes down the fd, there's no way to get it 
> from
> - * above.
> - */
> -static int get_raw_posix_fd_hack(VirtIOBlock *s)
> -{
> -    return *(int*)s->bs->file->opaque;
> -}
> -
>  static void data_plane_start(VirtIOBlock *s)
>  {
>      int i;
> -- 
> 1.7.10.4

Reply via email to