lgoldstein commented on a change in pull request #181:
URL: https://github.com/apache/mina-sshd/pull/181#discussion_r583723637



##########
File path: 
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
     @Override
     public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closed - state=" + state);
+            throw new EOFException("Closed/ing - state=" + state);
         }
 
+        waitForAvailableWriteSpace(buffer.available());
+
         IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
         writes.add(future);
         startWriting();
         return future;
     }
 
+    protected void waitForAvailableWriteSpace(int requiredSize) throws 
IOException {
+        long expireTime = System.currentTimeMillis() + 
maxWaitForPendingWrites.toMillis();
+        synchronized (pendingBytesCount) {
+            for (int count = pendingBytesCount.get();
+                 /*
+                  * The (count > 0) condition is put in place to allow a 
single pending
+                  * write to exceed the maxPendingBytesCount as long as there 
are no
+                  * other pending writes.
+                  */
+                 (count > 0)
+                         // Not already over the limit or about to be over it
+                         && ((count >= maxPendingBytesCount) || ((count + 
requiredSize) > maxPendingBytesCount))
+                         // No pending exception signaled
+                         && (pendingException.get() == null);
+                 count = pendingBytesCount.get()) {
+                long remTime = expireTime - System.currentTimeMillis();
+                if (remTime <= 0L) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Max. pending write timeout expired after 
" + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+
+                try {
+                    pendingBytesCount.wait(remTime);
+                } catch (InterruptedException e) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Waiting for pending writes interrupted 
after " + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+            }
+
+            IOException e = pendingException.get();
+            if (e != null) {
+                throw e;
+            }
+
+            pendingBytesCount.addAndGet(requiredSize);
+        }
+    }
+
     protected void startWriting() throws IOException {
         IoWriteFutureImpl future = writes.peek();
+        // No more pending requests
         if (future == null) {
             return;
         }
 
+        // Don't try to write any further if pending exception signaled
+        Throwable pendingError = pendingException.get();
+        if (pendingError != null) {
+            log.error("startWriting({})[{}] propagate to {} write requests 
pending error={}[{}]",
+                    getId(), out, writes.size(), getClass().getSimpleName(), 
pendingError.getMessage());
+
+            IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
+            for (IoWriteFutureImpl pendingWrite : writes) {
+                // Checking reference by design
+                if (GenericUtils.isSameReference(pendingWrite, currentFuture)) 
{
+                    continue;   // will be taken care of when its listener is 
eventually called
+                }
+
+                future.setValue(pendingError);
+            }
+
+            writes.clear();
+            return;
+        }
+
+        // Cannot honor this request yet since other pending one incomplete
         if (!currentWrite.compareAndSet(null, future)) {
             return;
         }
 
-        out.writeBuffer(future.getBuffer()).addListener(
-                new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(IoWriteFuture f) {
-                        if (f.isWritten()) {
-                            future.setValue(Boolean.TRUE);
-                        } else {
-                            future.setValue(f.getException());
-                        }
-                        finishWrite(future);
-                    }
-                });
+        Buffer buffer = future.getBuffer();
+        int bufferSize = buffer.available();
+        out.writeBuffer(buffer).addListener(new 
SshFutureListener<IoWriteFuture>() {
+            @Override
+            public void operationComplete(IoWriteFuture f) {
+                if (f.isWritten()) {
+                    future.setValue(Boolean.TRUE);
+                } else {
+                    future.setValue(f.getException());
+                }
+                finishWrite(future, bufferSize);
+            }
+        });
     }
 
-    protected void finishWrite(IoWriteFutureImpl future) {
+    protected void finishWrite(IoWriteFutureImpl future, int bufferSize) {
+        /*
+         * Update the pending bytes count only if successfully written,
+         * otherwise signal an error
+         */
+        if (future.isWritten()) {
+            long writtenSize = writtenBytesCount.addAndGet(bufferSize);
+
+            int stillPending;
+            synchronized (pendingBytesCount) {
+                stillPending = pendingBytesCount.addAndGet(0 - bufferSize);
+                pendingBytesCount.notifyAll();
+            }
+
+            if (stillPending < 0) {
+                log.error("finishWrite({})[{}] - pending byte counts underflow 
({}) after {} bytes", getId(), out, stillPending,
+                        writtenSize);
+                pendingException.compareAndSet(null,
+                        new SshChannelBufferedOutputException(channelId, 
"Pending byte counts underflow"));
+            }

Review comment:
       Yes, I intended this - I am fine with a single pending write giving this 
mechanism "the slip". My goal was to avoid an OOM by having an unlimited 
accumulation of pending write requests due to fact that the peer is not 
consuming the sent data. Please note that the pending exception is "sticky" - 
i.e., the _next_ write attempt will fail. This also means that if the write 
request the "got away" was the last one by change and it was consumed by the 
peer there will be no exception thrown - which is also fine by me since as 
mentioned the goal is not to enforce a strict limit on the pending bytes size 
but rather on the accumulation of the pending write requests. I will document 
this in the code to make it clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to