Re: [libvirt] [PATCH v3 01/31] virfdstream: Use messages instead of pipe

2017-05-17 Thread Michal Privoznik
On 05/16/2017 10:26 PM, John Ferlan wrote:
> 
> 
> On 05/16/2017 10:03 AM, Michal Privoznik wrote:
>> One big downside of using the pipe to transfer the data is that
>> we can really transfer just bare data. No metadata can be carried
>> through unless some formatted messages are introduced. That would
>> be quite painful to achieve so let's use a message queue. It's
>> fairly easy to exchange info between threads now that iohelper is
>> no longer used.
>>
>> The reason why we cannot use the FD for plain files directly is
>> that despite us setting noblock flag on the FD, any
>> read()/write() blocks regardless (which is a show stopper since
>> those parts of the code are run from the event loop) and poll()
>> reports such FD as always readable/writable - even though the
>> subsequent operation might block.
>>
>> The pipe is still not gone though. It is used to signal to even
> 
> s/to even/the event/
> 
>> loop that an event occurred (e.g. data are available for reading
> 
> s/are/is   (yes, an oddity of the language)
> 
>> in the queue, or vice versa).
>>
>> Signed-off-by: Michal Privoznik 
>> ---
>>  src/util/virfdstream.c | 402 
>> ++---
>>  1 file changed, 350 insertions(+), 52 deletions(-)
>>
> 
> I'm still getting a compilation error on this patch...
> 
> util/virfdstream.c: In function 'virFDStreamThread':
> util/virfdstream.c:551:15: error: 'got' may be used uninitialized in
> this function [-Werror=maybe-uninitialized]
>  total += got;
>^~
> 
>> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
>> index 5ce78fe58..4b42939e7 100644
>> --- a/src/util/virfdstream.c
>> +++ b/src/util/virfdstream.c
>> @@ -49,6 +49,27 @@
> 
> [...]
> 
>> +static ssize_t
>> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
>> + const int fdin,
>> + const int fdout,
>> + const char *fdinname,
>> + const char *fdoutname)
>> +{
>> +ssize_t got;
> 
> got = 0;
> 
> Fixes the compilation issue since got is only set for MSG_TYPE_DATA and
> even though there is only that type, the compiler seems to somehow
> believe it could be set ambiguously.

A-ha! So this function might return uninitialized value (variable?),
which is propagated to virFDStreamThread where it hits an error. Well,
one one hand compiler tries to be smart (neither of those checks
including @got in the parent function cause the compilation error since
in that case @got is initialized); but on the other hand compiler fails
to see there's no way with the current code for msg->type to be
something different than MSG_TYPE_DATA in which case @got is set a value.
Sigh.

> 
>> +virFDStreamMsgPtr msg = fdst->msg;
>> +bool pop = false;
>> +
>> +switch (msg->type) {
>> +case VIR_FDSTREAM_MSG_TYPE_DATA:
>> +got = safewrite(fdout,
>> +msg->stream.data.buf + msg->stream.data.offset,
>> +msg->stream.data.len - msg->stream.data.offset);
>> +if (got < 0) {
>> +virReportSystemError(errno,
>> + _("Unable to write %s"),
>> + fdoutname);
>> +return -1;
>> +}
>> +
>> +msg->stream.data.offset += got;
>> +
>> +pop = msg->stream.data.offset == msg->stream.data.len;
>> +break;
>> +}
>> +
>> +if (pop) {
>> +virFDStreamMsgQueuePop(fdst, fdin, fdinname);
>> +virFDStreamMsgFree(msg);
>> +}
>> +
>> +return got;
>> +}
>> +
>> +
>>  static void
>>  virFDStreamThread(void *opaque)
>>  {
>> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
>>  int fdout = data->fdout;
>>  char *fdoutname = data->fdoutname;
>>  virFDStreamDataPtr fdst = st->privateData;
>> -char *buf = NULL;
>> +bool doRead = fdst->threadDoRead;
> 
> Should the fdst ref come eafter the ObjectLock(fdst) below? [1]

Actually, it doesn't matter. At this point, @fdst should have at least
one reference held by parent process (I mean the other thread that
spawned this thread). Not even in the case when unfair thread scheduling
occurs. I mean, imagine streamOpen() & streamClose() to be called
immediately one after another. steamOpen() spawns the thread, but lets
assume that the scheduler is unfair and does not schedule the thread for
a while. Well, streamClose() waits for the thread to join anyway,
therefore it doesn't really matter on ordering of lock() & ref() operations.

Michal

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list


Re: [libvirt] [PATCH v3 01/31] virfdstream: Use messages instead of pipe

2017-05-16 Thread John Ferlan


On 05/16/2017 10:03 AM, Michal Privoznik wrote:
> One big downside of using the pipe to transfer the data is that
> we can really transfer just bare data. No metadata can be carried
> through unless some formatted messages are introduced. That would
> be quite painful to achieve so let's use a message queue. It's
> fairly easy to exchange info between threads now that iohelper is
> no longer used.
> 
> The reason why we cannot use the FD for plain files directly is
> that despite us setting noblock flag on the FD, any
> read()/write() blocks regardless (which is a show stopper since
> those parts of the code are run from the event loop) and poll()
> reports such FD as always readable/writable - even though the
> subsequent operation might block.
> 
> The pipe is still not gone though. It is used to signal to even

s/to even/the event/

> loop that an event occurred (e.g. data are available for reading

s/are/is   (yes, an oddity of the language)

> in the queue, or vice versa).
> 
> Signed-off-by: Michal Privoznik 
> ---
>  src/util/virfdstream.c | 402 
> ++---
>  1 file changed, 350 insertions(+), 52 deletions(-)
> 

I'm still getting a compilation error on this patch...

util/virfdstream.c: In function 'virFDStreamThread':
util/virfdstream.c:551:15: error: 'got' may be used uninitialized in
this function [-Werror=maybe-uninitialized]
 total += got;
   ^~

> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
> index 5ce78fe58..4b42939e7 100644
> --- a/src/util/virfdstream.c
> +++ b/src/util/virfdstream.c
> @@ -49,6 +49,27 @@

[...]

> +static ssize_t
> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
> + const int fdin,
> + const int fdout,
> + const char *fdinname,
> + const char *fdoutname)
> +{
> +ssize_t got;

got = 0;

Fixes the compilation issue since got is only set for MSG_TYPE_DATA and
even though there is only that type, the compiler seems to somehow
believe it could be set ambiguously.

> +virFDStreamMsgPtr msg = fdst->msg;
> +bool pop = false;
> +
> +switch (msg->type) {
> +case VIR_FDSTREAM_MSG_TYPE_DATA:
> +got = safewrite(fdout,
> +msg->stream.data.buf + msg->stream.data.offset,
> +msg->stream.data.len - msg->stream.data.offset);
> +if (got < 0) {
> +virReportSystemError(errno,
> + _("Unable to write %s"),
> + fdoutname);
> +return -1;
> +}
> +
> +msg->stream.data.offset += got;
> +
> +pop = msg->stream.data.offset == msg->stream.data.len;
> +break;
> +}
> +
> +if (pop) {
> +virFDStreamMsgQueuePop(fdst, fdin, fdinname);
> +virFDStreamMsgFree(msg);
> +}
> +
> +return got;
> +}
> +
> +
>  static void
>  virFDStreamThread(void *opaque)
>  {
> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
>  int fdout = data->fdout;
>  char *fdoutname = data->fdoutname;
>  virFDStreamDataPtr fdst = st->privateData;
> -char *buf = NULL;
> +bool doRead = fdst->threadDoRead;

Should the fdst ref come eafter the ObjectLock(fdst) below? [1]

>  size_t buflen = 256 * 1024;
>  size_t total = 0;
>  
>  virObjectRef(fdst);
> -
> -if (VIR_ALLOC_N(buf, buflen) < 0)
> -goto error;
> +virObjectLock(fdst);

^^^ [1]

Reviewed-by: John Ferlan 


John

[...]

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list


[libvirt] [PATCH v3 01/31] virfdstream: Use messages instead of pipe

2017-05-16 Thread Michal Privoznik
One big downside of using the pipe to transfer the data is that
we can really transfer just bare data. No metadata can be carried
through unless some formatted messages are introduced. That would
be quite painful to achieve so let's use a message queue. It's
fairly easy to exchange info between threads now that iohelper is
no longer used.

The reason why we cannot use the FD for plain files directly is
that despite us setting noblock flag on the FD, any
read()/write() blocks regardless (which is a show stopper since
those parts of the code are run from the event loop) and poll()
reports such FD as always readable/writable - even though the
subsequent operation might block.

The pipe is still not gone though. It is used to signal to even
loop that an event occurred (e.g. data are available for reading
in the queue, or vice versa).

Signed-off-by: Michal Privoznik 
---
 src/util/virfdstream.c | 402 ++---
 1 file changed, 350 insertions(+), 52 deletions(-)

diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
index 5ce78fe58..4b42939e7 100644
--- a/src/util/virfdstream.c
+++ b/src/util/virfdstream.c
@@ -49,6 +49,27 @@
 
 VIR_LOG_INIT("fdstream");
 
+typedef enum {
+VIR_FDSTREAM_MSG_TYPE_DATA,
+} virFDStreamMsgType;
+
+typedef struct _virFDStreamMsg virFDStreamMsg;
+typedef virFDStreamMsg *virFDStreamMsgPtr;
+struct _virFDStreamMsg {
+virFDStreamMsgPtr next;
+
+virFDStreamMsgType type;
+
+union {
+struct {
+char *buf;
+size_t len;
+size_t offset;
+} data;
+} stream;
+};
+
+
 /* Tunnelled migration stream support */
 typedef struct virFDStreamData virFDStreamData;
 typedef virFDStreamData *virFDStreamDataPtr;
@@ -80,18 +101,25 @@ struct virFDStreamData {
 
 /* Thread data */
 virThreadPtr thread;
+virCond threadCond;
 int threadErr;
 bool threadQuit;
+bool threadAbort;
+bool threadDoRead;
+virFDStreamMsgPtr msg;
 };
 
 static virClassPtr virFDStreamDataClass;
 
+static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
+
 static void
 virFDStreamDataDispose(void *obj)
 {
 virFDStreamDataPtr fdst = obj;
 
 VIR_DEBUG("obj=%p", fdst);
+virFDStreamMsgQueueFree(>msg);
 }
 
 static int virFDStreamDataOnceInit(void)
@@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void)
 VIR_ONCE_GLOBAL_INIT(virFDStreamData)
 
 
+static int
+virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
+virFDStreamMsgPtr msg,
+int fd,
+const char *fdname)
+{
+virFDStreamMsgPtr *tmp = >msg;
+char c = '1';
+
+while (*tmp)
+tmp = &(*tmp)->next;
+
+*tmp = msg;
+virCondSignal(>threadCond);
+
+if (safewrite(fd, , sizeof(c)) != sizeof(c)) {
+virReportSystemError(errno,
+ _("Unable to write to %s"),
+ fdname);
+return -1;
+}
+
+return 0;
+}
+
+
+static virFDStreamMsgPtr
+virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
+   int fd,
+   const char *fdname)
+{
+virFDStreamMsgPtr tmp = fdst->msg;
+char c;
+
+if (tmp) {
+fdst->msg = tmp->next;
+tmp->next = NULL;
+}
+
+virCondSignal(>threadCond);
+
+if (saferead(fd, , sizeof(c)) != sizeof(c)) {
+virReportSystemError(errno,
+ _("Unable to read from %s"),
+ fdname);
+return NULL;
+}
+
+return tmp;
+}
+
+
+static void
+virFDStreamMsgFree(virFDStreamMsgPtr msg)
+{
+if (!msg)
+return;
+
+switch (msg->type) {
+case VIR_FDSTREAM_MSG_TYPE_DATA:
+VIR_FREE(msg->stream.data.buf);
+break;
+}
+
+VIR_FREE(msg);
+}
+
+
+static void
+virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
+{
+virFDStreamMsgPtr tmp = *queue;
+
+while (tmp) {
+virFDStreamMsgPtr next = tmp->next;
+virFDStreamMsgFree(tmp);
+tmp = next;
+}
+
+*queue = NULL;
+}
+
+
 static int virFDStreamRemoveCallback(virStreamPtr stream)
 {
 virFDStreamDataPtr fdst = stream->privateData;
@@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
 struct _virFDStreamThreadData {
 virStreamPtr st;
 size_t length;
+bool doRead;
 int fdin;
 char *fdinname;
 int fdout;
@@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
 }
 
 
+static ssize_t
+virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+const int fdin,
+const int fdout,
+const char *fdinname,
+const char *fdoutname,
+size_t buflen)
+{
+virFDStreamMsgPtr msg = NULL;
+char *buf = NULL;
+ssize_t got;
+
+if (VIR_ALLOC(msg) < 0)
+goto error;
+
+if (VIR_ALLOC_N(buf, buflen) < 0)
+goto