On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
Use I/O vector (iovec) instead of one huge memory buffer as suggested
in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
doing memmove() to big buffers and performance doesn't degrade if
source (virNetClientStreamQueuePacket()) is faster than sink
(virNetClientStreamRecvPacket()).

Sorry to miss this mail, it got buried somehow and I haven't got to it
until now since nobody pinged it.  Sorry for the long wait then.

I would remove the 'RH1026137: ' from the commit message and instead
added a 'Resolves: http://bugzilla.redhat.com/1026137' or something
similar here.

---
src/rpc/virnetclientstream.c |  134 +++++++++++++++++++++++++----------------
1 files changed, 82 insertions(+), 52 deletions(-)

diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index b428f4b..18c6e8b 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -49,9 +49,9 @@ struct _virNetClientStream {
     * time by stopping consuming any incoming data
     * off the socket....
     */
-    char *incoming;
-    size_t incomingOffset;
-    size_t incomingLength;
+    struct iovec *incomingVec; /* I/O Vector to hold data */
+    size_t writeVec;           /* Vectors produced */
+    size_t readVec;            /* Vectors consumed */
    bool incomingEOF;

    virNetClientStreamEventCallback cb;
@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
    if (!st->cb)
        return;

-    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
+    VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, 
st->writeVec, st->cbEvents);

-    if (((st->incomingOffset || st->incomingEOF) &&
+    if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
        (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
        VIR_DEBUG("Enabling event timer");
@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, 
void *opaque)

    if (st->cb &&
        (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        (st->incomingOffset || st->incomingEOF))
+        ((st->readVec < st->writeVec) || st->incomingEOF))
        events |= VIR_STREAM_EVENT_READABLE;
    if (st->cb &&
        (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
        events |= VIR_STREAM_EVENT_WRITABLE;

-    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, 
st->incomingOffset);
+    VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, 
st->cbEvents,
+              st->readVec, st->writeVec);
    if (events) {
        virNetClientStreamEventCallback cb = st->cb;
        void *cbOpaque = st->cbOpaque;
@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
    virNetClientStreamPtr st = obj;

    virResetError(&st->err);
-    VIR_FREE(st->incoming);
+    VIR_FREE(st->incomingVec);
    virObjectUnref(st->prog);
}

@@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr 
st,
                                  virNetMessagePtr msg)
{
    int ret = -1;
-    size_t need;
+    struct iovec iov;
+    char *base;
+    size_t piece, pieces, length, offset = 0, size = 1024*1024;

    virObjectLock(st);
-    need = msg->bufferLength - msg->bufferOffset;
-    if (need) {
-        size_t avail = st->incomingLength - st->incomingOffset;
-        if (need > avail) {
-            size_t extra = need - avail;
-            if (VIR_REALLOC_N(st->incoming,
-                              st->incomingLength + extra) < 0) {
-                VIR_DEBUG("Out of memory handling stream data");
-                goto cleanup;
-            }
-            st->incomingLength += extra;
-        }

-        memcpy(st->incoming + st->incomingOffset,
-               msg->buffer + msg->bufferOffset,
-               msg->bufferLength - msg->bufferOffset);
-        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
-    } else {
+    length = msg->bufferLength - msg->bufferOffset;
+
+    if (length == 0) {
        st->incomingEOF = true;
+        goto end;
    }

-    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
-              st->incomingOffset, st->incomingLength,
-              st->incomingEOF);
+    pieces = (length + size - 1) / size;
+    for (piece = 0; piece < pieces; piece++) {
+        if (size > length - offset)
+            size = length - offset;
+
+        if (VIR_ALLOC_N(base, size)) {
+            VIR_DEBUG("Allocation failed");
+            goto cleanup;
+        }
+
+        memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
+        iov.iov_base = base;
+        iov.iov_len = size;
+        offset += size;
+
+        if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
+            VIR_DEBUG("Append failed");
+            VIR_FREE(base);
+            goto cleanup;
+        }
+        VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", 
st->readVec, st->writeVec, size);

Long line, should be wrapped.

+    }
+
+ end:
    virNetClientStreamEventTimerUpdate(st);
-
    ret = 0;

 cleanup:
+    VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
+              st->readVec, st->writeVec, st->incomingEOF);
    virObjectUnlock(st);
    return ret;
}
@@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
                                 size_t nbytes,
                                 bool nonblock)
{
-    int rv = -1;
+    int ret = -1;
+    size_t partial, offset;
+
+    virObjectLock(st);
+
    VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
              st, client, data, nbytes, nonblock);
-    virObjectLock(st);
-    if (!st->incomingOffset && !st->incomingEOF) {
+
+    if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
        virNetMessagePtr msg;
-        int ret;
+        int rv;

        if (nonblock) {
            VIR_DEBUG("Non-blocking mode and no data available");
-            rv = -2;
+            ret = -2;
            goto cleanup;
        }

@@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,

        VIR_DEBUG("Dummy packet to wait for stream data");
        virObjectUnlock(st);
-        ret = virNetClientSendWithReplyStream(client, msg, st);
+        rv = virNetClientSendWithReplyStream(client, msg, st);
        virObjectLock(st);
        virNetMessageFree(msg);

-        if (ret < 0)
+        if (rv < 0)
            goto cleanup;
    }

-    VIR_DEBUG("After IO %zu", st->incomingOffset);
-    if (st->incomingOffset) {
-        int want = st->incomingOffset;
-        if (want > nbytes)
-            want = nbytes;
-        memcpy(data, st->incoming, want);
-        if (want < st->incomingOffset) {
-            memmove(st->incoming, st->incoming + want, st->incomingOffset - 
want);
-            st->incomingOffset -= want;
+    offset = 0;
+    partial = nbytes;
+
+    while (st->incomingVec && (st->readVec < st->writeVec)) {
+        struct iovec *iov = st->incomingVec + st->readVec;
+
+        if (!iov || !iov->iov_base) {
+            VIR_DEBUG("NULL pointer");

This should be virReportError(VIR_ERR_INTERNAL_ERROR, ...) or VIR_ERR_RPC.

+            goto cleanup;
+        }
+
+        if (partial < iov->iov_len) {
+            memcpy(data+offset, iov->iov_base, partial);
+            memmove(iov->iov_base, (char*)iov->iov_base+partial, 
iov->iov_len-partial);

Long line.

+            iov->iov_len -= partial;
+            offset += partial;
+            VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
+            break;
        } else {

You don't need to enclose this in an else body thanks to the break
above.

-            VIR_FREE(st->incoming);
-            st->incomingOffset = st->incomingLength = 0;
+            memcpy(data+offset, iov->iov_base, iov->iov_len);
+            VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
+            partial -= iov->iov_len;
+            offset += iov->iov_len;
+            VIR_FREE(iov->iov_base);
+            iov->iov_len = 0;
+            st->readVec++;

The only thing I would mention wrt to how it works after this patch is
that it will consume some memory that's not needed, precisely
(sizeof(struct iovec) + sizeof(void *)) * unreadMBs.  It might be
worth it to do:

 memmove(st->incomingVec, st->incomingVec + st->readVec,
         st->writeVec - st->readVec);
 VIR_SHRINK_N(st->incomingVec, st->readVec);
 st->writeVec -= st->readVec;
 st->readVec = 0;

Apart from that it's definitely *way* better approach.  What do you
think of such modification?  This would go either instead
'st->readVec++', but rather at the end of this function, so it's not
done after each MB read.

        }
-        rv = want;
-    } else {
-        rv = 0;
+
+        VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", 
offset, st->readVec, st->writeVec);

Long line.

    }

+    ret = offset;
    virNetClientStreamEventTimerUpdate(st);

 cleanup:
    virObjectUnlock(st);
-    return rv;
+    return ret;
}


Apart from mentioned cosmetic changes this looks very nice.

Martin

Attachment: signature.asc
Description: PGP signature

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list

Reply via email to