lgoldstein commented on a change in pull request #181:
URL: https://github.com/apache/mina-sshd/pull/181#discussion_r583713918
##########
File path:
sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
##########
@@ -52,60 +78,164 @@ public Object getId() {
@Override
public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
if (isClosing()) {
- throw new EOFException("Closed - state=" + state);
+ throw new EOFException("Closed/ing - state=" + state);
}
+ waitForAvailableWriteSpace(buffer.available());
+
IoWriteFutureImpl future = new IoWriteFutureImpl(getId(), buffer);
writes.add(future);
startWriting();
return future;
}
+ protected void waitForAvailableWriteSpace(int requiredSize) throws
IOException {
+ long expireTime = System.currentTimeMillis() +
maxWaitForPendingWrites.toMillis();
+ synchronized (pendingBytesCount) {
+ for (int count = pendingBytesCount.get();
+ /*
+ * The (count > 0) condition is put in place to allow a
single pending
+ * write to exceed the maxPendingBytesCount as long as there
are no
+ * other pending writes.
+ */
+ (count > 0)
+ // Not already over the limit or about to be over it
+ && ((count >= maxPendingBytesCount) || ((count +
requiredSize) > maxPendingBytesCount))
+ // No pending exception signaled
+ && (pendingException.get() == null);
+ count = pendingBytesCount.get()) {
+ long remTime = expireTime - System.currentTimeMillis();
+ if (remTime <= 0L) {
+ pendingException.compareAndSet(null,
+ new SshChannelBufferedOutputException(
+ channelId,
+ "Max. pending write timeout expired after
" + writtenBytesCount + " bytes"));
+ throw pendingException.get();
+ }
+
+ try {
+ pendingBytesCount.wait(remTime);
+ } catch (InterruptedException e) {
+ pendingException.compareAndSet(null,
+ new SshChannelBufferedOutputException(
+ channelId,
+ "Waiting for pending writes interrupted
after " + writtenBytesCount + " bytes"));
+ throw pendingException.get();
+ }
+ }
+
+ IOException e = pendingException.get();
+ if (e != null) {
+ throw e;
+ }
+
+ pendingBytesCount.addAndGet(requiredSize);
+ }
+ }
+
protected void startWriting() throws IOException {
IoWriteFutureImpl future = writes.peek();
+ // No more pending requests
if (future == null) {
return;
}
+ // Don't try to write any further if pending exception signaled
+ Throwable pendingError = pendingException.get();
+ if (pendingError != null) {
+ log.error("startWriting({})[{}] propagate to {} write requests
pending error={}[{}]",
+ getId(), out, writes.size(), getClass().getSimpleName(),
pendingError.getMessage());
+
+ IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
+ for (IoWriteFutureImpl pendingWrite : writes) {
+ // Checking reference by design
+ if (GenericUtils.isSameReference(pendingWrite, currentFuture))
{
+ continue; // will be taken care of when its listener is
eventually called
+ }
+
+ future.setValue(pendingError);
+ }
+
+ writes.clear();
+ return;
+ }
+
+ // Cannot honor this request yet since other pending one incomplete
if (!currentWrite.compareAndSet(null, future)) {
return;
}
- out.writeBuffer(future.getBuffer()).addListener(
- new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(IoWriteFuture f) {
- if (f.isWritten()) {
- future.setValue(Boolean.TRUE);
- } else {
- future.setValue(f.getException());
- }
- finishWrite(future);
- }
- });
+ Buffer buffer = future.getBuffer();
+ int bufferSize = buffer.available();
+ out.writeBuffer(buffer).addListener(new
SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(IoWriteFuture f) {
+ if (f.isWritten()) {
+ future.setValue(Boolean.TRUE);
+ } else {
+ future.setValue(f.getException());
+ }
+ finishWrite(future, bufferSize);
+ }
+ });
}
- protected void finishWrite(IoWriteFutureImpl future) {
+ protected void finishWrite(IoWriteFutureImpl future, int bufferSize) {
+ /*
+ * Update the pending bytes count only if successfully written,
+ * otherwise signal an error
+ */
+ if (future.isWritten()) {
+ long writtenSize = writtenBytesCount.addAndGet(bufferSize);
+
+ int stillPending;
+ synchronized (pendingBytesCount) {
+ stillPending = pendingBytesCount.addAndGet(0 - bufferSize);
+ pendingBytesCount.notifyAll();
+ }
+
+ if (stillPending < 0) {
+ log.error("finishWrite({})[{}] - pending byte counts underflow
({}) after {} bytes", getId(), out, stillPending,
+ writtenSize);
+ pendingException.compareAndSet(null,
+ new SshChannelBufferedOutputException(channelId,
"Pending byte counts underflow"));
+ }
Review comment:
I'll review this code...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]