On Mon, Nov 27, 2023 at 05:25:57PM -0300, Fabiano Rosas wrote:
> For the upcoming support to fixed-ram migration with multifd, we need
> to be able to accept an iovec array with non-contiguous data.
> 
> Add a pwritev and preadv version that splits the array into contiguous
> segments before writing. With that we can have the ram code continue
> to add pages in any order and the multifd code continue to send large
> arrays for reading and writing.
> 
> Signed-off-by: Fabiano Rosas <faro...@suse.de>
> ---
> - split the API that was merged into a single function
> - use uintptr_t for compatibility with 32-bit
> ---
>  include/io/channel.h | 26 ++++++++++++++++
>  io/channel.c         | 70 ++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 96 insertions(+)
> 
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 7986c49c71..25383db5aa 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -559,6 +559,19 @@ int qio_channel_close(QIOChannel *ioc,
>  ssize_t qio_channel_pwritev(QIOChannel *ioc, const struct iovec *iov,
>                              size_t niov, off_t offset, Error **errp);
>  
> +/**
> + * qio_channel_pwritev_all:
> + * @ioc: the channel object
> + * @iov: the array of memory regions to write data from
> + * @niov: the length of the @iov array
> + * @offset: the iovec offset in the file where to write the data
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Returns: 0 if all bytes were written, or -1 on error
> + */
> +int qio_channel_pwritev_all(QIOChannel *ioc, const struct iovec *iov,
> +                            size_t niov, off_t offset, Error **errp);
> +
>  /**
>   * qio_channel_pwrite
>   * @ioc: the channel object
> @@ -595,6 +608,19 @@ ssize_t qio_channel_pwrite(QIOChannel *ioc, char *buf, 
> size_t buflen,
>  ssize_t qio_channel_preadv(QIOChannel *ioc, const struct iovec *iov,
>                             size_t niov, off_t offset, Error **errp);
>  
> +/**
> + * qio_channel_preadv_all:
> + * @ioc: the channel object
> + * @iov: the array of memory regions to read data to
> + * @niov: the length of the @iov array
> + * @offset: the iovec offset in the file from where to read the data
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Returns: 0 if all bytes were read, or -1 on error
> + */
> +int qio_channel_preadv_all(QIOChannel *ioc, const struct iovec *iov,
> +                           size_t niov, off_t offset, Error **errp);
> +
>  /**
>   * qio_channel_pread
>   * @ioc: the channel object
> diff --git a/io/channel.c b/io/channel.c
> index a1f12f8e90..2f1745d052 100644
> --- a/io/channel.c
> +++ b/io/channel.c
> @@ -472,6 +472,69 @@ ssize_t qio_channel_pwritev(QIOChannel *ioc, const 
> struct iovec *iov,
>      return klass->io_pwritev(ioc, iov, niov, offset, errp);
>  }
>  
> +static int qio_channel_preadv_pwritev_contiguous(QIOChannel *ioc,
> +                                                 const struct iovec *iov,
> +                                                 size_t niov, off_t offset,
> +                                                 bool is_write, Error **errp)
> +{
> +    ssize_t ret = -1;
> +    int i, slice_idx, slice_num;
> +    uintptr_t base, next, file_offset;
> +    size_t len;
> +
> +    slice_idx = 0;
> +    slice_num = 1;
> +
> +    /*
> +     * If the iov array doesn't have contiguous elements, we need to
> +     * split it in slices because we only have one (file) 'offset' for
> +     * the whole iov. Do this here so callers don't need to break the
> +     * iov array themselves.
> +     */
> +    for (i = 0; i < niov; i++, slice_num++) {
> +        base = (uintptr_t) iov[i].iov_base;
> +
> +        if (i != niov - 1) {
> +            len = iov[i].iov_len;
> +            next = (uintptr_t) iov[i + 1].iov_base;
> +
> +            if (base + len == next) {
> +                continue;
> +            }
> +        }
> +
> +        /*
> +         * Use the offset of the first element of the segment that
> +         * we're sending.
> +         */
> +        file_offset = offset + (uintptr_t) iov[slice_idx].iov_base;
> +
> +        if (is_write) {
> +            ret = qio_channel_pwritev(ioc, &iov[slice_idx], slice_num,
> +                                      file_offset, errp);
> +        } else {
> +            ret = qio_channel_preadv(ioc, &iov[slice_idx], slice_num,
> +                                     file_offset, errp);
> +        }
> +
> +        if (ret < 0) {
> +            break;
> +        }
> +
> +        slice_idx += slice_num;
> +        slice_num = 0;
> +    }
> +
> +    return (ret < 0) ? -1 : 0;
> +}
> +
> +int qio_channel_pwritev_all(QIOChannel *ioc, const struct iovec *iov,
> +                            size_t niov, off_t offset, Error **errp)
> +{
> +    return qio_channel_preadv_pwritev_contiguous(ioc, iov, niov,
> +                                                 offset, true, errp);
> +}

I'm not sure how Dan thinks about this, but I don't think this is pretty..

With this implementation, iochannels' preadv/pwritev is completely not
compatible with most OSes now, afaiu.

The definition of offset in preadv/pwritev of current iochannel is hard to
understand.. if I read it right it'll later be set to:
      
                /*
                 * If we subtract the host page now, we don't need to
                 * pass it into qio_channel_pwritev_all() below.
                 */
                write_base = p->pages->block->pages_offset -
                    (uintptr_t)p->pages->block->host;

Which I cannot easily tell what it is.. besides being an unsigned int.

IIUC it's also based on the assumption that the host address of each iov
entry is linear to its offset in the file, but it may not be true for
future iochannel users of such interface called as pwritev/preadv.  So
error prone.

Would it be possible we keep using the offset array (p->pages->offset[x])?
We have it already anyway, right?  Wouldn't that be clearer?

It doesn't need to be called pwritev/preadv, but taking two arrays: the
host address array and another offset array on that file.  It can still do
the range merge, do another sanity check on the offsets to make sure the
offsets are also continuous (and should be true in our case).

> +
>  ssize_t qio_channel_pwrite(QIOChannel *ioc, char *buf, size_t buflen,
>                             off_t offset, Error **errp)
>  {
> @@ -501,6 +564,13 @@ ssize_t qio_channel_preadv(QIOChannel *ioc, const struct 
> iovec *iov,
>      return klass->io_preadv(ioc, iov, niov, offset, errp);
>  }
>  
> +int qio_channel_preadv_all(QIOChannel *ioc, const struct iovec *iov,
> +                           size_t niov, off_t offset, Error **errp)
> +{
> +    return qio_channel_preadv_pwritev_contiguous(ioc, iov, niov,
> +                                                 offset, false, errp);
> +}
> +
>  ssize_t qio_channel_pread(QIOChannel *ioc, char *buf, size_t buflen,
>                            off_t offset, Error **errp)
>  {
> -- 
> 2.35.3
> 

-- 
Peter Xu


Reply via email to