Re: [libvirt] [PATCH v3 27/31] fdstream: Implement sparse stream

2017-05-17 Thread John Ferlan


On 05/16/2017 10:04 AM, Michal Privoznik wrote:
> Basically, what is needed here is to introduce new message type
> for the messages passed between the event loop callbacks and the
> worker thread that does all the I/O. The idea is that instead of
> a queue of read buffers we will have a queue where "hole of size
> X" messages appear. That way the even loop callbacks can just

s/even/event/

> check the head of the queue and see if the worker thread is in
> data or a hole section and how long the section is.
> 
> Signed-off-by: Michal Privoznik 
> ---
>  src/storage/storage_util.c |   4 +-
>  src/util/virfdstream.c | 239 
> -
>  src/util/virfdstream.h |   1 +
>  3 files changed, 220 insertions(+), 24 deletions(-)
> 

[...]

> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
> index 4b42939e7..ba209025a 100644
> --- a/src/util/virfdstream.c
> +++ b/src/util/virfdstream.c

[...]

>  static ssize_t
>  virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
> +bool sparse,
>  const int fdin,
>  const int fdout,
>  const char *fdinname,
>  const char *fdoutname,
> +size_t *dataLen,
>  size_t buflen)
>  {
>  virFDStreamMsgPtr msg = NULL;
> +int inData = 0;
> +long long sectionLen = 0;
>  char *buf = NULL;
>  ssize_t got;
>  
> +if (sparse && *dataLen == 0) {
> +if (virFileInData(fdin, , ) < 0)
> +goto error;
> +
> +if (inData)
> +*dataLen = sectionLen;
> +}
> +
>  if (VIR_ALLOC(msg) < 0)
>  goto error;
>  
> -if (VIR_ALLOC_N(buf, buflen) < 0)
> -goto error;
> -
> -if ((got = saferead(fdin, buf, buflen)) < 0) {
> -virReportSystemError(errno,
> - _("Unable to read %s"),
> - fdinname);
> -goto error;
> +if (sparse && *dataLen == 0) {
> +msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
> +msg->stream.hole.len = sectionLen;
> +got = sectionLen;
> +
> +/* HACK. The message queue is one directional. So caller

HACK or "By design"

> + * cannot make us skip the hole. Do that for them instead. */
> +if (sectionLen &&
> +lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
> +virReportSystemError(errno,
> + _("unable to seek in %s"),
> + fdinname);
> +goto error;
> +}

[...]

> +static int
> +virFDStreamSendHole(virStreamPtr st,
> +long long length,
> +unsigned int flags)
> +{
> +virFDStreamDataPtr fdst = st->privateData;
> +virFDStreamMsgPtr msg = NULL;
> +off_t off;
> +int ret = -1;
> +
> +virCheckFlags(0, -1);
> +
> +virObjectLock(fdst);
> +if (fdst->length) {
> +if (length > fdst->length - fdst->offset)
> +length = fdst->length - fdst->offset;
> +fdst->offset += length;
> +}
> +
> +if (fdst->thread) {
> +/* Things are a bit complicated here. But bear with me. If FDStream 
> is


s/But bear with me.//

> + * in a read mode, then if the message at the queue head is HOLE, 
> just
> + * pop it. The thread has lseek()-ed anyway. If however, the FDStream

However, if the FDStream

Reviewed-by: John Ferlan 

John

> + * is in write mode, then tell the thread to do the lseek() for us.
> + * Under no circumstances we can do the lseek() ourselves here. We
> + * might mess up file position for the thread. */
> +if (fdst->threadDoRead) {
> +msg = fdst->msg;
> +if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) {
> +virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> +   _("Invalid stream hole"));
> +goto cleanup;
> +}
> +
> +virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
> +} else {
> +if (VIR_ALLOC(msg) < 0)
> +goto cleanup;
> +
> +msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
> +msg->stream.hole.len = length;
> +virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
> +msg = NULL;
> +}

[...]

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list


[libvirt] [PATCH v3 27/31] fdstream: Implement sparse stream

2017-05-16 Thread Michal Privoznik
Basically, what is needed here is to introduce new message type
for the messages passed between the event loop callbacks and the
worker thread that does all the I/O. The idea is that instead of
a queue of read buffers we will have a queue where "hole of size
X" messages appear. That way the even loop callbacks can just
check the head of the queue and see if the worker thread is in
data or a hole section and how long the section is.

Signed-off-by: Michal Privoznik 
---
 src/storage/storage_util.c |   4 +-
 src/util/virfdstream.c | 239 -
 src/util/virfdstream.h |   1 +
 3 files changed, 220 insertions(+), 24 deletions(-)

diff --git a/src/storage/storage_util.c b/src/storage/storage_util.c
index 43f3561f8..908cad874 100644
--- a/src/storage/storage_util.c
+++ b/src/storage/storage_util.c
@@ -2427,7 +2427,7 @@ virStorageBackendVolUploadLocal(virConnectPtr conn 
ATTRIBUTE_UNUSED,
 /* Not using O_CREAT because the file is required to already exist at
  * this point */
 ret = virFDStreamOpenBlockDevice(stream, target_path,
- offset, len, O_WRONLY);
+ offset, len, false, O_WRONLY);
 
  cleanup:
 VIR_FREE(path);
@@ -2465,7 +2465,7 @@ virStorageBackendVolDownloadLocal(virConnectPtr conn 
ATTRIBUTE_UNUSED,
 }
 
 ret = virFDStreamOpenBlockDevice(stream, target_path,
- offset, len, O_RDONLY);
+ offset, len, false, O_RDONLY);
 
  cleanup:
 VIR_FREE(path);
diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index 4b42939e7..ba209025a 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -51,6 +51,7 @@ VIR_LOG_INIT("fdstream");
 
 typedef enum {
 VIR_FDSTREAM_MSG_TYPE_DATA,
+VIR_FDSTREAM_MSG_TYPE_HOLE,
 } virFDStreamMsgType;
 
 typedef struct _virFDStreamMsg virFDStreamMsg;
@@ -66,6 +67,9 @@ struct _virFDStreamMsg {
 size_t len;
 size_t offset;
 } data;
+struct {
+long long len;
+} hole;
 } stream;
 };
 
@@ -198,6 +202,9 @@ virFDStreamMsgFree(virFDStreamMsgPtr msg)
 case VIR_FDSTREAM_MSG_TYPE_DATA:
 VIR_FREE(msg->stream.data.buf);
 break;
+case VIR_FDSTREAM_MSG_TYPE_HOLE:
+/* nada */
+break;
 }
 
 VIR_FREE(msg);
@@ -385,6 +392,7 @@ struct _virFDStreamThreadData {
 virStreamPtr st;
 size_t length;
 bool doRead;
+bool sparse;
 int fdin;
 char *fdinname;
 int fdout;
@@ -407,34 +415,68 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
 
 static ssize_t
 virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+bool sparse,
 const int fdin,
 const int fdout,
 const char *fdinname,
 const char *fdoutname,
+size_t *dataLen,
 size_t buflen)
 {
 virFDStreamMsgPtr msg = NULL;
+int inData = 0;
+long long sectionLen = 0;
 char *buf = NULL;
 ssize_t got;
 
+if (sparse && *dataLen == 0) {
+if (virFileInData(fdin, , ) < 0)
+goto error;
+
+if (inData)
+*dataLen = sectionLen;
+}
+
 if (VIR_ALLOC(msg) < 0)
 goto error;
 
-if (VIR_ALLOC_N(buf, buflen) < 0)
-goto error;
-
-if ((got = saferead(fdin, buf, buflen)) < 0) {
-virReportSystemError(errno,
- _("Unable to read %s"),
- fdinname);
-goto error;
+if (sparse && *dataLen == 0) {
+msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
+msg->stream.hole.len = sectionLen;
+got = sectionLen;
+
+/* HACK. The message queue is one directional. So caller
+ * cannot make us skip the hole. Do that for them instead. */
+if (sectionLen &&
+lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
+virReportSystemError(errno,
+ _("unable to seek in %s"),
+ fdinname);
+goto error;
+}
+} else {
+if (sparse &&
+buflen > *dataLen)
+buflen = *dataLen;
+
+if (VIR_ALLOC_N(buf, buflen) < 0)
+goto error;
+
+if ((got = saferead(fdin, buf, buflen)) < 0) {
+virReportSystemError(errno,
+ _("Unable to read %s"),
+ fdinname);
+goto error;
+}
+
+msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+msg->stream.data.buf = buf;
+msg->stream.data.len = got;
+buf = NULL;
+if (sparse)
+*dataLen -= got;
 }
 
-msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
-msg->stream.data.buf = buf;
-msg->stream.data.len = got;
-buf = NULL;
-