See the commit message on the patch.

With out this patch, libssh2 memory usage was growing uncontrolled when receiving large amounts of data.

With this patch applied I have been able to perform transfers of hundreds of GBs correctly.
>From f4eb1dbd837ba169a7198c74b3b4fcf459b53643 Mon Sep 17 00:00:00 2001
From: Salvador Fandino <[email protected]>
Date: Sat, 12 Oct 2013 02:51:46 +0200
Subject: [PATCH] Fix flow control

Until now, the window size (channel->remote.window_size) was being updated
just after receiving the packet from the transport layer.

That behaviour is wrong because the channel queue may grow uncontrolled
when data arrives from the network faster that the upper layer consumes it.

This patch adds a new counter, read_avail, which keeps a count of the bytes
available from the packet queue for reading. Also, now the window size is
adjusted when the data is actually read by an upper layer.

That way, if the upper layer stops reading data, the window will eventually
fill and the remote host will stop sending data. When the upper layers reads
enough data, a window adjust packet is delivered and the transfer resumes.

The read_avail counter is used to detect the situation when the remote
server tries to send data surpassing the window size. In that case, the
extra data is discarded.

Signed-off-by: Salvador Fandino <[email protected]>
---
 src/channel.c      |  8 +++++++-
 src/libssh2_priv.h |  2 ++
 src/packet.c       | 33 +++++++++++++++++++++++++--------
 3 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/src/channel.c b/src/channel.c
index 128a04e..68b1857 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -1414,6 +1414,9 @@ _libssh2_channel_flush(LIBSSH2_CHANNEL *channel, int streamid)
         channel->flush_state = libssh2_NB_state_created;
     }
 
+    channel->read_avail -= channel->flush_flush_bytes;
+    channel->remote.window_size -= channel->flush_flush_bytes;
+
     if (channel->flush_refund_bytes) {
         int rc;
 
@@ -1871,11 +1874,14 @@ ssize_t _libssh2_channel_read(LIBSSH2_CHANNEL *channel, int stream_id,
         /* if the transport layer said EAGAIN then we say so as well */
         return _libssh2_error(session, rc, "would block");
     }
-    else
+    else {
+        channel->read_avail -= bytes_read;
+        channel->remote.window_size -= bytes_read;
         /* make sure we remain in the created state to focus on emptying the
            data we already have in the packet brigade before we try to read
            more off the network again */
         channel->read_state = libssh2_NB_state_created;
+    }
 
     if(channel->remote.window_size < (LIBSSH2_CHANNEL_WINDOW_DEFAULT*30)) {
         /* the window is getting too narrow, expand it! */
diff --git a/src/libssh2_priv.h b/src/libssh2_priv.h
index 05b1ffc..461d14c 100644
--- a/src/libssh2_priv.h
+++ b/src/libssh2_priv.h
@@ -356,6 +356,8 @@ struct _LIBSSH2_CHANNEL
     libssh2_channel_data local, remote;
     /* Amount of bytes to be refunded to receive window (but not yet sent) */
     uint32_t adjust_queue;
+    /* Data immediately available for reading */
+    uint32_t read_avail;
 
     LIBSSH2_SESSION *session;
 
diff --git a/src/packet.c b/src/packet.c
index a4887c8..7887e61 100644
--- a/src/packet.c
+++ b/src/packet.c
@@ -654,8 +654,18 @@ _libssh2_packet_add(LIBSSH2_SESSION * session, unsigned char *data,
                 _libssh2_debug(session, LIBSSH2_TRACE_CONN,
                                "Ignoring extended data and refunding %d bytes",
                                (int) (datalen - 13));
-                session->packAdd_channelp = channelp;
+                if (channelp->read_avail + datalen - data_head >= channelp->remote.window_size)
+                    datalen = channelp->remote.window_size - channelp->read_avail + data_head;
 
+                channelp->remote.window_size -= datalen - data_head;
+                _libssh2_debug(session, LIBSSH2_TRACE_CONN,
+                               "shrinking window size by %lu bytes to %lu, read_avail %lu",
+                               datalen - data_head,
+                               channelp->remote.window_size,
+                               channelp->read_avail);
+
+                session->packAdd_channelp = channelp;
+                
                 /* Adjust the window based on the block we just freed */
               libssh2_packet_add_jump_point1:
                 session->packAdd_state = libssh2_NB_state_jump1;
@@ -685,7 +695,7 @@ _libssh2_packet_add(LIBSSH2_SESSION * session, unsigned char *data,
                                " to receive, truncating");
                 datalen = channelp->remote.packet_size + data_head;
             }
-            if (channelp->remote.window_size <= 0) {
+            if (channelp->remote.window_size <= channelp->read_avail) {
                 /*
                  * Spec says we MAY ignore bytes sent beyond
                  * window_size
@@ -701,17 +711,24 @@ _libssh2_packet_add(LIBSSH2_SESSION * session, unsigned char *data,
             /* Reset EOF status */
             channelp->remote.eof = 0;
 
-            if ((datalen - data_head) > channelp->remote.window_size) {
+            if (channelp->read_avail + datalen - data_head > channelp->remote.window_size) {
                 _libssh2_error(session,
                                LIBSSH2_ERROR_CHANNEL_WINDOW_EXCEEDED,
                                "Remote sent more data than current "
                                "window allows, truncating");
-                datalen = channelp->remote.window_size + data_head;
-                channelp->remote.window_size = 0;
+                datalen = channelp->remote.window_size - channelp->read_avail + data_head;
             }
-            else
-                /* Now that we've received it, shrink our window */
-                channelp->remote.window_size -= datalen - data_head;
+
+            /* Update the read_avail counter. The window size will be
+             * updated once the data is actually read from the queue
+             * from an upper layer */
+            channelp->read_avail += datalen - data_head;
+
+            _libssh2_debug(session, LIBSSH2_TRACE_CONN,
+                           "increasing read_avail by %lu bytes to %lu/%lu",
+                           (long)(datalen - data_head),
+                           (long)channelp->read_avail,
+                           (long)channelp->remote.window_size);
 
             break;
 
-- 
1.8.3.2

_______________________________________________
libssh2-devel http://cool.haxx.se/cgi-bin/mailman/listinfo/libssh2-devel

Reply via email to