On 05/05/2017 05:43 PM, John Ferlan wrote:
> 
> 
> On 04/20/2017 06:01 AM, Michal Privoznik wrote:
>> Whenever server sends a client stream packet (either regular with
>> actual data or stream skip one) it is queued on @st->rx. So the
>> list is a mixture of both types of stream packets. So now that we
>> have all the helpers needed we can wire their processing up. But
>> since virNetClientStreamRecvPacket doesn't support
>> VIR_STREAM_RECV_STOP_AT_HOLE flag yet, let's turn all received
>> skips into zeroes repeating requested times.
>>
> 
> Up to this point - I thought I had a good idea of what was going on, but
> this patch loses me.
> 
> I thought there was an "ordered" message queue to be processed...
> ordered in so much as we're "reading" a file and sending either a 'data'
> segment w/ data or a 'skip' segment with some number of bytes to skip.
> Where it could be:
> 
>    start...skip...data...[skip...data...]end
>    start...data...skip...[data...skip...]end
>    start...data...end
>    start...skip...end
> 
> So why does the code process and sum up the skips?

Because our streams are slightly more generic than just a file transfer.
I mean, we use it for just any data transfer. Therefore it could be used
even for this

start -> data -> skip -> skip -> data -> end.

You won't have this with regular files stored on a disk, but then again,
virStream can be used (and is used) for generic data transfer.
vol-download an vol-upload is just a subset of its usage.

Now, with that example, it makes sense to merge those skips into one
big, doesn't it? But I agree, it looks a bit weird. I can drop the
merging - it's not a performance issue. I was just trying to save caller
couple of calls to our APIs.

> 
> Is this because of some implementation detail (that I already forgot) of
> the message queue where signals are done after each "data..." or
> "skip...", so it won't matter?
> 
> Why not have everything you need in place before you wire this up -
> patch 25 and 30 seem to be able to go after "most of" patch 31.
> 
> John
> 
> I'm going to stop here.

Fair enough. I'm gonna send v2 with most of your review points worked
in. Except, for now I'm still gonna use Skip() and HoleSize() - changing
that now would be a non-trivial piece of work and thus I'd like to do it
just once, when we have clear agreement on naming. Hope you understand that.

> 
>> Signed-off-by: Michal Privoznik <mpriv...@redhat.com>
>> ---
>>  src/rpc/virnetclientstream.c | 44 
>> ++++++++++++++++++++++++++++++++++++++++++--
>>  1 file changed, 42 insertions(+), 2 deletions(-)
>>
>> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>> index c773524..ff35137 100644
>> --- a/src/rpc/virnetclientstream.c
>> +++ b/src/rpc/virnetclientstream.c
>> @@ -296,6 +296,8 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr 
>> st,
>>  
>>      virObjectLock(st);
>>  
>> +    /* Don't distinguish VIR_NET_STREAM and VIR_NET_STREAM_SKIP
>> +     * here just yet. We want in order processing! */
>>      virNetMessageQueuePush(&st->rx, tmp_msg);
>>  
>>      virNetClientStreamEventTimerUpdate(st);
>> @@ -359,7 +361,7 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr 
>> st,
>>  }
>>  
>>  
>> -static int ATTRIBUTE_UNUSED
>> +static int
>>  virNetClientStreamHandleSkip(virNetClientPtr client,
>>                               virNetClientStreamPtr st)
>>  {
>> @@ -435,6 +437,8 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr 
>> st,
>>      virCheckFlags(0, -1);
>>  
>>      virObjectLock(st);
>> +
>> + reread:
>>      if (!st->rx && !st->incomingEOF) {
>>          virNetMessagePtr msg;
>>          int ret;
>> @@ -466,8 +470,44 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr 
>> st,
>>      }
>>  
>>      VIR_DEBUG("After IO rx=%p", st->rx);
>> +
>> +    while (st->rx &&
>> +           st->rx->header.type == VIR_NET_STREAM_SKIP) {
>> +        /* Handle skip sent to us by server. */
>> +
>> +        if (virNetClientStreamHandleSkip(client, st) < 0)
>> +            goto cleanup;

You can see my reasoning from above in effect here.

>> +    }
>> +
>> +    if (!st->rx && !st->incomingEOF && !st->skipLength) {
>> +        if (nonblock) {
>> +            VIR_DEBUG("Non-blocking mode and no data available");
>> +            rv = -2;
>> +            goto cleanup;
>> +        }
>> +
>> +        /* We have consumed all packets from incoming queue but those
>> +         * were only skip packets, no data. Read the stream again. */
>> +        goto reread;
>> +    }
>> +
>>      want = nbytes;
>> -    while (want && st->rx) {
>> +
>> +    if (st->skipLength) {
>> +        /* Pretend skipLength zeroes was read from stream. */
>> +        size_t len = want;
>> +
>> +        if (len > st->skipLength)
>> +            len = st->skipLength;
>> +
>> +        memset(data, 0, len);
>> +        st->skipLength -= len;
>> +        want -= len;
>> +    }
>> +
>> +    while (want &&
>> +           st->rx &&
>> +           st->rx->header.type == VIR_NET_STREAM) {
>>          virNetMessagePtr msg = st->rx;
>>          size_t len = want;
>>  
>>

Michal

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list

Reply via email to