On 6/7/22 11:19, Claudio Fontana wrote:
> allow interleaved parallel write to a single file,
> using a record size equal to the io buffer size (1MB).
> 
> Signed-off-by: Claudio Fontana <cfont...@suse.de>
> ---
>  src/util/iohelper.c |   3 +
>  src/util/virfile.c  | 151 +++++++++++++++++++++++++++++---------------
>  src/util/virfile.h  |   2 +
>  3 files changed, 106 insertions(+), 50 deletions(-)
> 
> diff --git a/src/util/iohelper.c b/src/util/iohelper.c
> index 055540c8c4..dcbdda366f 100644
> --- a/src/util/iohelper.c
> +++ b/src/util/iohelper.c
> @@ -85,6 +85,9 @@ main(int argc, char **argv)
>      if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0)
>          goto error;
>  
> +    if (VIR_CLOSE(fd) < 0)
> +        goto error;
> +
>      return 0;
>  
>   error:
> diff --git a/src/util/virfile.c b/src/util/virfile.c
> index 201d7f4e64..f9ae7d94c4 100644
> --- a/src/util/virfile.c
> +++ b/src/util/virfile.c
> @@ -4761,6 +4761,9 @@ struct runIOParams {
>      const char *fdinname;
>      int fdout;
>      const char *fdoutname;
> +    int idx;
> +    int nchannels;
> +    off_t total;
>  };
>  
>  /**
> @@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p)
>      off_t total = 0;
>      size_t buflen = 1024*1024;
>      char *buf = virFileDirectBufferNew(&base, buflen);
> +    int diskfd = p.isWrite ? p.fdout : p.fdin;
>  
>      if (!buf) {
>          virReportSystemError(errno, _("Failed to allocate aligned memory in 
> function %s"), __FUNCTION__);
>          return -5;
>      }
> -
> +    if (p.idx >= 0) {
> +        if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) {
> +            virReportSystemError(errno, "%s", _("Failed to lseek to file 
> channel offset"));
> +            return -6;
> +        }
> +    }
>      while (1) {
>          ssize_t got;
>  
> @@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p)
>              break;
>  
>          total += got;
> -
> +        if (p.idx >= 0 && !p.isWrite && total > p.total) {
> +            /* do not write to socket too much for this channel, according 
> to CLIA */
> +            off_t difference = total - p.total;
> +            got -= difference;
> +            total -= difference;
> +        }
>          /* handle last write size align in direct case */
>          if (got < buflen && p.isDirect && p.isWrite) {
>              ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got);
> @@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p)
>                  virReportSystemError(errno, _("Unable to write %s"), 
> p.fdoutname);
>                  return -3;
>              }
> -            if (!p.isBlockDev) {
> +            if (!p.isBlockDev && p.idx < 0) {
>                  off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR);
>                  if (off < 0) {
>                      virReportSystemError(errno, "%s", _("Failed to lseek to 
> get current file offset"));
> @@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p)
>                  }
>                  if (nwritten > got) {
>                      off -= nwritten - got;
> +                    total -= nwritten - got;
>                  }
>                  if (ftruncate(p.fdout, off) < 0) {
>                      virReportSystemError(errno, _("Unable to truncate %s"), 
> p.fdoutname);
> @@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p)
>              virReportSystemError(errno, _("Unable to write %s"), 
> p.fdoutname);
>              return -3;
>          }
> +        if (p.idx >= 0) {
> +            if (!p.isWrite && total >= p.total) {
> +                /* done for this channel */
> +                break;
> +            }
> +            /* move channel cursor to the next record */
> +            if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) {
> +                virReportSystemError(errno, "%s", _("Failed to lseek to next 
> channel record"));
> +                return -7;
> +            }
> +        }
>      }
>      return total;
>  }
>  
>  /**
> - * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
> - *
> - * @disk_fd:     the already open regular file or block device
> - * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
> - * @remote_fd:   the pipe or socket
> - *               Use -1 to auto-choose between STDIN or STDOUT.
> - * @remote_path: the pathname corresponding to remote_fd (for error 
> reporting)
> - *
> - * Note that the direction of the transfer is detected based on the @disk_fd
> - * file access mode (man 2 open). Therefore @disk_fd must be opened with
> - * O_RDONLY or O_WRONLY. O_RDWR is not supported.
> - *
> - * virFileDiskCopy always closes the file descriptor disk_fd,
> - * and any error during close(2) is reported and considered a failure.
> - *
> - * Returns: bytes transferred or < 0 on failure.
> + * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved 
> read/write
> + * ...
> + * @idx:       channel index
> + * @nchannels: total number of channels
>   */
>  
>  off_t
> -virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const 
> char *remote_path)
> +virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, 
> const char *remote_path,
> +                       int idx, int nchannels, off_t total)
>  {
> -    int ret = -1;
> -    off_t total = 0;
> +    off_t new_total = -1;
>      struct stat sb;
>      struct runIOParams p;
>      int oflags = -1;
>  
> +    if ((nchannels == 0) ||
> +        (nchannels > 0 && idx >= nchannels) ||
> +        (nchannels > 0 && idx < 0) ||
> +        (nchannels < 0 && idx >= 0)) {
> +        virReportSystemError(EINVAL, "%s", _("Invalid channel arguments"));
> +        goto out;
> +    }
> +    p.idx = idx;
> +    p.nchannels = nchannels;
> +    p.total = total;
> +
>      oflags = fcntl(disk_fd, F_GETFL);
>  
>      if (oflags < 0) {
>          virReportSystemError(errno,
>                               _("unable to determine access mode of %s"),
>                               disk_path);
> -        goto cleanup;
> +        goto out;
>      }
>      if (fstat(disk_fd, &sb) < 0) {
>          virReportSystemError(errno,
>                               _("unable to stat file descriptor %d path %s"),
>                               disk_fd, disk_path);
> -        goto cleanup;
> +        goto out;
>      }
>      p.isBlockDev = S_ISBLK(sb.st_mode);
>      p.isDirect = O_DIRECT && (oflags & O_DIRECT);
> @@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, 
> int remote_fd, const char *r
>      default:
>          virReportSystemError(EINVAL, _("Unable to process file with flags 
> %d"),
>                               (oflags & O_ACCMODE));
> -        goto cleanup;
> +        goto out;
>      }
>      if (!p.isBlockDev && p.isDirect) {
>          off_t off = lseek(disk_fd, 0, SEEK_CUR);
>          if (off < 0) {
>              virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable 
> file"));
> -            goto cleanup;
> +            goto out;
>          }
>          if (virFileDirectAlign(off) != off) {
>              /* we could write some zeroes, but maybe it is safer to just 
> fail */
>              virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with 
> unaligned file pointer"));
> -            goto cleanup;
> +            goto out;
>          }
>      }
> -    total = runIOCopy(p);
> -    if (total < 0)
> -        goto cleanup;
> -
> -    /* Ensure all data is written */
> -    if (virFileDataSync(p.fdout) < 0) {
> -        if (errno != EINVAL && errno != EROFS) {
> -            /* fdatasync() may fail on some special FDs, e.g. pipes */
> -            virReportSystemError(errno, _("unable to fsync %s"), 
> p.fdoutname);
> -            goto cleanup;
> +    new_total = runIOCopy(p);
> +    if (new_total < 0)
> +        goto out;
> +
> +    if (p.idx < 0 && p.isWrite) {
> +        /* without channels we can run the fdatasync here */
> +        if (virFileDataSync(disk_fd) < 0) {
> +            if (errno != EINVAL && errno != EROFS) {
> +                virReportSystemError(errno, _("unable to fsyncdata %s"), 
> p.fdoutname);
> +                new_total = -1;
> +                goto out;
> +            }
>          }
>      }
>  
> -    ret = 0;
> -
> - cleanup:
> -    if (VIR_CLOSE(disk_fd) < 0 && ret == 0) {
> -        virReportSystemError(errno, _("Unable to close %s"), disk_path);
> -        ret = -1;
> -    }
> -    return ret;
> + out:
> +    return new_total;
>  }
>  
>  #else /* WIN32 */
>  
>  off_t
> -virFileDiskCopy(int disk_fd G_GNUC_UNUSED,
> -                const char *disk_path G_GNUC_UNUSED,
> -                int remote_fd G_GNUC_UNUSED,
> -                const char *remote_path G_GNUC_UNUSED)
> +virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED,
> +                       const char *disk_path G_GNUC_UNUSED,
> +                       int remote_fd G_GNUC_UNUSED,
> +                       const char *remote_path G_GNUC_UNUSED,
> +                       int idx G_GNUC_UNUSED,
> +                       int nchannels G_GNUC_UNUSED,
> +                       off_t total G_GNUC_UNUSED)
>  {
>      virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> -                   _("virFileDiskCopy unsupported on this platform"));
> +                   _("virFileDiskCopyChannel unsupported on this platform"));
>      return -1;
>  }
>  #endif /* WIN32 */
> +
> +/**
> + * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
> + *
> + * @disk_fd:     the already open regular file or block device
> + * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
> + * @remote_fd:   the pipe or socket
> + *               Use -1 to auto-choose between STDIN or STDOUT.
> + * @remote_path: the pathname corresponding to remote_fd (for error 
> reporting)
> + *
> + * Note that the direction of the transfer is detected based on the @disk_fd
> + * file access mode (man 2 open). Therefore @disk_fd must be opened with
> + * O_RDONLY or O_WRONLY. O_RDWR is not supported.
> + *
> + * virFileDiskCopy always closes the file descriptor disk_fd,
> + * and any error during close(2) is reported and considered a failure.

this is not true anymore, the close needs to be done outside of virFileDiskCopy 
now.

> + *
> + * Returns: bytes transferred or < 0 on failure.
> + */
> +
> +off_t
> +virFileDiskCopy(int disk_fd, const char *disk_path,
> +                int remote_fd, const char *remote_path)
> +{
> +    return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path,
> +                                  -1, -1, 0);
> +}
> diff --git a/src/util/virfile.h b/src/util/virfile.h
> index 844261e0a4..4d75389c84 100644
> --- a/src/util/virfile.h
> +++ b/src/util/virfile.h
> @@ -394,3 +394,5 @@ int virFileSetCOW(const char *path,
>                    virTristateBool state);
>  
>  off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, 
> const char *remote_path);
> +off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int 
> remote_fd, const char *remote_path,
> +                             int idx, int nchannels, off_t total);

Reply via email to