tomaswolf commented on a change in pull request #181:
URL: https://github.com/apache/mina-sshd/pull/181#discussion_r583691938



##########
File path: 
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
     @Override
     public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closed - state=" + state);
+            throw new EOFException("Closed/ing - state=" + state);
         }
 
+        waitForAvailableWriteSpace(buffer.available());
+
         IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
         writes.add(future);
         startWriting();
         return future;
     }
 
+    protected void waitForAvailableWriteSpace(int requiredSize) throws 
IOException {
+        long expireTime = System.currentTimeMillis() + 
maxWaitForPendingWrites.toMillis();
+        synchronized (pendingBytesCount) {
+            for (int count = pendingBytesCount.get();
+                 /*
+                  * The (count > 0) condition is put in place to allow a 
single pending
+                  * write to exceed the maxPendingBytesCount as long as there 
are no
+                  * other pending writes.
+                  */
+                 (count > 0)
+                         // Not already over the limit or about to be over it
+                         && ((count >= maxPendingBytesCount) || ((count + 
requiredSize) > maxPendingBytesCount))

Review comment:
       This could be just count + requiredSize > maxPendingBytesCount ?

##########
File path: 
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
     @Override
     public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closed - state=" + state);
+            throw new EOFException("Closed/ing - state=" + state);
         }
 
+        waitForAvailableWriteSpace(buffer.available());
+
         IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
         writes.add(future);
         startWriting();
         return future;
     }
 
+    protected void waitForAvailableWriteSpace(int requiredSize) throws 
IOException {
+        long expireTime = System.currentTimeMillis() + 
maxWaitForPendingWrites.toMillis();
+        synchronized (pendingBytesCount) {
+            for (int count = pendingBytesCount.get();
+                 /*
+                  * The (count > 0) condition is put in place to allow a 
single pending
+                  * write to exceed the maxPendingBytesCount as long as there 
are no
+                  * other pending writes.
+                  */
+                 (count > 0)
+                         // Not already over the limit or about to be over it
+                         && ((count >= maxPendingBytesCount) || ((count + 
requiredSize) > maxPendingBytesCount))
+                         // No pending exception signaled
+                         && (pendingException.get() == null);
+                 count = pendingBytesCount.get()) {
+                long remTime = expireTime - System.currentTimeMillis();
+                if (remTime <= 0L) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Max. pending write timeout expired after 
" + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+
+                try {
+                    pendingBytesCount.wait(remTime);
+                } catch (InterruptedException e) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Waiting for pending writes interrupted 
after " + writtenBytesCount + " bytes"));
+                    throw pendingException.get();

Review comment:
       I think the canonical treatment of InterruptedException if you can't 
propagate it is to at least re-set the interrupted flag by doing 
Thread.currentThread().interrupt().

##########
File path: 
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
     @Override
     public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closed - state=" + state);
+            throw new EOFException("Closed/ing - state=" + state);
         }
 
+        waitForAvailableWriteSpace(buffer.available());
+
         IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
         writes.add(future);
         startWriting();
         return future;
     }
 
+    protected void waitForAvailableWriteSpace(int requiredSize) throws 
IOException {
+        long expireTime = System.currentTimeMillis() + 
maxWaitForPendingWrites.toMillis();
+        synchronized (pendingBytesCount) {
+            for (int count = pendingBytesCount.get();
+                 /*
+                  * The (count > 0) condition is put in place to allow a 
single pending
+                  * write to exceed the maxPendingBytesCount as long as there 
are no
+                  * other pending writes.
+                  */
+                 (count > 0)
+                         // Not already over the limit or about to be over it
+                         && ((count >= maxPendingBytesCount) || ((count + 
requiredSize) > maxPendingBytesCount))
+                         // No pending exception signaled
+                         && (pendingException.get() == null);
+                 count = pendingBytesCount.get()) {
+                long remTime = expireTime - System.currentTimeMillis();
+                if (remTime <= 0L) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Max. pending write timeout expired after 
" + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+
+                try {
+                    pendingBytesCount.wait(remTime);
+                } catch (InterruptedException e) {
+                    pendingException.compareAndSet(null,
+                            new SshChannelBufferedOutputException(
+                                    channelId,
+                                    "Waiting for pending writes interrupted 
after " + writtenBytesCount + " bytes"));
+                    throw pendingException.get();
+                }
+            }
+
+            IOException e = pendingException.get();
+            if (e != null) {
+                throw e;
+            }
+
+            pendingBytesCount.addAndGet(requiredSize);
+        }
+    }
+
     protected void startWriting() throws IOException {
         IoWriteFutureImpl future = writes.peek();
+        // No more pending requests
         if (future == null) {
             return;
         }
 
+        // Don't try to write any further if pending exception signaled
+        Throwable pendingError = pendingException.get();
+        if (pendingError != null) {
+            log.error("startWriting({})[{}] propagate to {} write requests 
pending error={}[{}]",
+                    getId(), out, writes.size(), getClass().getSimpleName(), 
pendingError.getMessage());
+
+            IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
+            for (IoWriteFutureImpl pendingWrite : writes) {
+                // Checking reference by design
+                if (GenericUtils.isSameReference(pendingWrite, currentFuture)) 
{
+                    continue;   // will be taken care of when its listener is 
eventually called
+                }
+
+                future.setValue(pendingError);
+            }
+
+            writes.clear();
+            return;
+        }
+
+        // Cannot honor this request yet since other pending one incomplete
         if (!currentWrite.compareAndSet(null, future)) {
             return;
         }
 
-        out.writeBuffer(future.getBuffer()).addListener(
-                new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(IoWriteFuture f) {
-                        if (f.isWritten()) {
-                            future.setValue(Boolean.TRUE);
-                        } else {
-                            future.setValue(f.getException());
-                        }
-                        finishWrite(future);
-                    }
-                });
+        Buffer buffer = future.getBuffer();
+        int bufferSize = buffer.available();
+        out.writeBuffer(buffer).addListener(new 
SshFutureListener<IoWriteFuture>() {
+            @Override
+            public void operationComplete(IoWriteFuture f) {
+                if (f.isWritten()) {
+                    future.setValue(Boolean.TRUE);
+                } else {
+                    future.setValue(f.getException());
+                }
+                finishWrite(future, bufferSize);
+            }
+        });
     }
 
-    protected void finishWrite(IoWriteFutureImpl future) {
+    protected void finishWrite(IoWriteFutureImpl future, int bufferSize) {
+        /*
+         * Update the pending bytes count only if successfully written,
+         * otherwise signal an error
+         */
+        if (future.isWritten()) {
+            long writtenSize = writtenBytesCount.addAndGet(bufferSize);
+
+            int stillPending;
+            synchronized (pendingBytesCount) {
+                stillPending = pendingBytesCount.addAndGet(0 - bufferSize);
+                pendingBytesCount.notifyAll();
+            }
+
+            if (stillPending < 0) {
+                log.error("finishWrite({})[{}] - pending byte counts underflow 
({}) after {} bytes", getId(), out, stillPending,
+                        writtenSize);
+                pendingException.compareAndSet(null,
+                        new SshChannelBufferedOutputException(channelId, 
"Pending byte counts underflow"));
+            }

Review comment:
       This makes me feel queasy. pendingBytesCount is decremented, then 
notified, and only then the pendingException is set. Which means we make wake 
up a thread waiting in waitForAvailableWriteSpace(), which will quit the loop 
(since count <= 0), then may _not_ get the pending exception in line 127 (since 
here it is set outside the critical region), then add its bytes and happily 
continue with its write.
   
   Not sure that's what you intended.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to