lgoldstein commented on a change in pull request #181:
URL: https://github.com/apache/mina-sshd/pull/181#discussion_r583723637
##########
File path:
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
@Override
public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
- throw new EOFException("Closed - state=" + state);
+ throw new EOFException("Closed/ing - state=" + state);
}
+ waitForAvailableWriteSpace(buffer.available());
+
IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
writes.add(future);
startWriting();
return future;
}
+ protected void waitForAvailableWriteSpace(int requiredSize) throws
IOException {
+ long expireTime = System.currentTimeMillis() +
maxWaitForPendingWrites.toMillis();
+ synchronized (pendingBytesCount) {
+ for (int count = pendingBytesCount.get();
+ /*
+ * The (count > 0) condition is put in place to allow a
single pending
+ * write to exceed the maxPendingBytesCount as long as there
are no
+ * other pending writes.
+ */
+ (count > 0)
+ // Not already over the limit or about to be over it
+ && ((count >= maxPendingBytesCount) || ((count +
requiredSize) > maxPendingBytesCount))
+ // No pending exception signaled
+ && (pendingException.get() == null);
+ count = pendingBytesCount.get()) {
+ long remTime = expireTime - System.currentTimeMillis();
+ if (remTime <= 0L) {
+ pendingException.compareAndSet(null,
+ new SshChannelBufferedOutputException(
+ channelId,
+ "Max. pending write timeout expired after
" + writtenBytesCount + " bytes"));
+ throw pendingException.get();
+ }
+
+ try {
+ pendingBytesCount.wait(remTime);
+ } catch (InterruptedException e) {
+ pendingException.compareAndSet(null,
+ new SshChannelBufferedOutputException(
+ channelId,
+ "Waiting for pending writes interrupted
after " + writtenBytesCount + " bytes"));
+ throw pendingException.get();
+ }
+ }
+
+ IOException e = pendingException.get();
+ if (e != null) {
+ throw e;
+ }
+
+ pendingBytesCount.addAndGet(requiredSize);
+ }
+ }
+
protected void startWriting() throws IOException {
IoWriteFutureImpl future = writes.peek();
+ // No more pending requests
if (future == null) {
return;
}
+ // Don't try to write any further if pending exception signaled
+ Throwable pendingError = pendingException.get();
+ if (pendingError != null) {
+ log.error("startWriting({})[{}] propagate to {} write requests
pending error={}[{}]",
+ getId(), out, writes.size(), getClass().getSimpleName(),
pendingError.getMessage());
+
+ IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
+ for (IoWriteFutureImpl pendingWrite : writes) {
+ // Checking reference by design
+ if (GenericUtils.isSameReference(pendingWrite, currentFuture))
{
+ continue; // will be taken care of when its listener is
eventually called
+ }
+
+ future.setValue(pendingError);
+ }
+
+ writes.clear();
+ return;
+ }
+
+ // Cannot honor this request yet since other pending one incomplete
if (!currentWrite.compareAndSet(null, future)) {
return;
}
- out.writeBuffer(future.getBuffer()).addListener(
- new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(IoWriteFuture f) {
- if (f.isWritten()) {
- future.setValue(Boolean.TRUE);
- } else {
- future.setValue(f.getException());
- }
- finishWrite(future);
- }
- });
+ Buffer buffer = future.getBuffer();
+ int bufferSize = buffer.available();
+ out.writeBuffer(buffer).addListener(new
SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(IoWriteFuture f) {
+ if (f.isWritten()) {
+ future.setValue(Boolean.TRUE);
+ } else {
+ future.setValue(f.getException());
+ }
+ finishWrite(future, bufferSize);
+ }
+ });
}
- protected void finishWrite(IoWriteFutureImpl future) {
+ protected void finishWrite(IoWriteFutureImpl future, int bufferSize) {
+ /*
+ * Update the pending bytes count only if successfully written,
+ * otherwise signal an error
+ */
+ if (future.isWritten()) {
+ long writtenSize = writtenBytesCount.addAndGet(bufferSize);
+
+ int stillPending;
+ synchronized (pendingBytesCount) {
+ stillPending = pendingBytesCount.addAndGet(0 - bufferSize);
+ pendingBytesCount.notifyAll();
+ }
+
+ if (stillPending < 0) {
+ log.error("finishWrite({})[{}] - pending byte counts underflow
({}) after {} bytes", getId(), out, stillPending,
+ writtenSize);
+ pendingException.compareAndSet(null,
+ new SshChannelBufferedOutputException(channelId,
"Pending byte counts underflow"));
+ }
Review comment:
Yes, I intended this - I am fine with a single pending write giving this
mechanism "the slip". My goal was to avoid an OOM by having an unlimited
accumulation of pending write requests due to fact that the peer is not
consuming the sent data. Please note that the pending exception is "sticky" -
i.e., the _next_ write attempt will fail. This also means that if the write
request that "got away" was the last one by chance and it was consumed by the
peer there will be no exception thrown - which is also fine by me since as
mentioned the goal is not to enforce a strict limit on the pending bytes size
but rather on the accumulation of the pending write requests. I will document
this in the code to make it clear.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]