[ 
https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761480&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761480
 ]

ASF GitHub Bot logged work on SSHD-966:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/22 17:21
            Start Date: 24/Apr/22 17:21
    Worklog Time Spent: 10m 
      Work Description: lgoldstein commented on code in PR #217:
URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857153738


##########
sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java:
##########
@@ -70,6 +70,10 @@ public class ClientUserAuthService extends AbstractCloseable 
implements Service,
     private UserAuth userAuth;
     private int currentMethod;
 
+    private Object initLock = new Object();

Review Comment:
   Recommend making *initLock* `final`



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -90,52 +93,174 @@ public byte getCommandType() {
         return cmd;
     }
 
-    public void onWindowExpanded() throws IOException {
-        doWriteIfPossible(true);
-    }
-
+    /**
+     * {@inheritDoc}
+     *
+     * This write operation is <em>asynchronous</em>: if there is not enough 
window space, it may keep the write pending
+     * or write only part of the buffer and keep the rest pending. Concurrent 
writes are not allowed and will throw a
+     * {@link WritePendingException}. Any subsequent write <em>must</em> occur 
only once the returned future is
+     * fulfilled; for instance triggered via a listener on the returned 
future. Try to avoid doing a subsequent write
+     * directly in a future listener, though; doing so may lead to deep chains 
of nested listener calls with deep stack
+     * traces, and may ultimately lead to a stack overflow.
+     *
+     * @throws WritePendingException if a concurrent write is attempted
+     */
     @Override
-    public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws 
IOException {
+    public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closing: " + state);
+            throw new EOFException("Closing: " + writeState);
         }
 
         IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, 
buffer);
-        if (!pendingWrite.compareAndSet(null, future)) {
-            throw new WritePendingException("A write operation is already 
pending");
+        synchronized (writeState) {
+            if (writeState.isClosing()) { // Double check.
+                throw new EOFException("Closing: " + writeState);
+            }
+            if (writeState.writeInProgress) {
+                throw new WritePendingException("A write operation is already 
pending");
+            }
+            writeState.lastWrite = future;
+            writeState.pendingWrite = future;
+            writeState.writeInProgress = true;
+            writeState.waitingOnIo = false;
         }
         doWriteIfPossible(false);
         return future;
     }
 
     @Override
     protected void preClose() {
-        if (!(packetWriter instanceof Channel)) {
-            try {
-                packetWriter.close();
-            } catch (IOException e) {
-                error("preClose({}) Failed ({}) to pre-close packet writer: 
{}",
-                        this, e.getClass().getSimpleName(), e.getMessage(), e);
+        synchronized (writeState) {
+            writeState.openState = state.get();
+        }
+        super.preClose();
+    }
+
+    @Override
+    protected void doCloseImmediately() {
+        try {
+            // Can't close this in preClose(); a graceful close waits for the 
currently pending write to finish and thus
+            // still needs the packet writer.
+            if (!(packetWriter instanceof Channel)) {
+                try {
+                    packetWriter.close();
+                } catch (IOException e) {
+                    error("preClose({}) Failed ({}) to pre-close packet 
writer: {}",
+                            this, e.getClass().getSimpleName(), 
e.getMessage(), e);
+                }
             }
+            super.doCloseImmediately();
+        } finally {
+            shutdown();
         }
+    }
 
-        super.preClose();
+    private void shutdown() {
+        IoWriteFutureImpl current = null;
+        synchronized (writeState) {
+            writeState.openState = State.Closed;
+            current = writeState.pendingWrite;
+            writeState.pendingWrite = null;
+            writeState.waitingOnIo = false;
+        }
+        if (current != null) {
+            terminateFuture(current);
+        }
+    }
+
+    private void terminateFuture(IoWriteFutureImpl future) {

Review Comment:
   Let's make this *protected* in the spirit of allowing users to overload 
anything they want in our code.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -282,4 +435,43 @@ public boolean 
isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
     public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean 
sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = 
sendChunkIfRemoteWindowIsSmallerThanPacketSize;
     }
+
+    // Marker type to avoid repeated buffering
+    private static class BufferedFuture extends IoWriteFutureImpl {
+
+        BufferedFuture(Object id, Buffer buffer) {
+            super(id, buffer);
+        }
+    }
+
+    // Collects state variables; access is always synchronized on the single 
instance per stream.
+    private static class WriteState {

Review Comment:
   Let's make this *protected* (as well as its constructor, fields and methods) 
in the spirit of allowing users to overload anything they want in our code.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -171,43 +227,52 @@ public synchronized void write(byte[] buf, int s, int l) 
throws IOException {
                                 "Interrupted while waiting for remote space on 
write len=" + l + " to " + this)
                                         .initCause(e);
                     }
-                }
-                session.resetIdleTimeout();
-                continue;
+                    session.resetIdleTimeout();
+                    break;
+                default:
+                    // BUFFERED implies l == 0; outer loop will terminate
+                    break;

Review Comment:
   No need for *break* in *default* clause.



##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that 
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages 
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send 
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the 
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key 
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7";>RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+    // With asynchronous flushing we get a classic producer-consumer problem. 
The flushing thread is the single
+    // consumer, and there is a risk that it might get overrun by the 
producers. The classical solution of using a
+    // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we 
cannot make the producers block when the queue
+    // is full; we might deadlock or be unable to handle any incoming message.
+    //
+    // We need an unbounded queue that never blocks the producers, but that 
manages to throttle them such that the
+    // flushing thread can actually finish, and we still can handle incoming 
messages (in particular also the peer's
+    // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our 
own SSH_MSG_NEW_KEYS).
+    //
+    // This is achieved by giving the flushing thread priority over the 
threads that might enqueue additional packets
+    // and flushing at least two packets at a time. Additionally the flush 
loop releases and shortly afterwards
+    // re-acquires the write lock, so normally not many readers (i.e., 
writePacket() calls) will get a chance to enqueue
+    // new packets.
+
+    /**
+     * We need the flushing thread to have priority over writing threads. So 
we use a lock that favors writers over
+     * readers, and any state updates and the flushing thread are writers, 
while writePacket() is a reader.
+     */
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(false);

Review Comment:
   Let's make the members *protected* in the spirit of letting our users 
override whatever they like - e,g, provide their own derived handler.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -42,6 +43,19 @@
  * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
  */
 public class ChannelOutputStream extends OutputStream implements 
java.nio.channels.Channel, ChannelHolder {
+
+    private enum WriteState {

Review Comment:
   Let's make this *protected* (or even *public*) in the spirit of allowing 
users to overload anything they want in our code.



##########
sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java:
##########
@@ -124,7 +128,15 @@ public Map<String, Object> getProperties() {
 
     @Override
     public void start() {
-        // ignored
+        Runnable initial = null;

Review Comment:
   No need to initialize *initial* to *null* since its value is set 
unconditionally inside the *synchronized* block.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -90,52 +93,174 @@ public byte getCommandType() {
         return cmd;
     }
 
-    public void onWindowExpanded() throws IOException {
-        doWriteIfPossible(true);
-    }
-
+    /**
+     * {@inheritDoc}
+     *
+     * This write operation is <em>asynchronous</em>: if there is not enough 
window space, it may keep the write pending
+     * or write only part of the buffer and keep the rest pending. Concurrent 
writes are not allowed and will throw a
+     * {@link WritePendingException}. Any subsequent write <em>must</em> occur 
only once the returned future is
+     * fulfilled; for instance triggered via a listener on the returned 
future. Try to avoid doing a subsequent write
+     * directly in a future listener, though; doing so may lead to deep chains 
of nested listener calls with deep stack
+     * traces, and may ultimately lead to a stack overflow.
+     *
+     * @throws WritePendingException if a concurrent write is attempted
+     */
     @Override
-    public synchronized IoWriteFuture writeBuffer(Buffer buffer) throws 
IOException {
+    public IoWriteFuture writeBuffer(Buffer buffer) throws IOException {
         if (isClosing()) {
-            throw new EOFException("Closing: " + state);
+            throw new EOFException("Closing: " + writeState);
         }
 
         IoWriteFutureImpl future = new IoWriteFutureImpl(packetWriteId, 
buffer);
-        if (!pendingWrite.compareAndSet(null, future)) {
-            throw new WritePendingException("A write operation is already 
pending");
+        synchronized (writeState) {
+            if (writeState.isClosing()) { // Double check.
+                throw new EOFException("Closing: " + writeState);
+            }
+            if (writeState.writeInProgress) {
+                throw new WritePendingException("A write operation is already 
pending");
+            }
+            writeState.lastWrite = future;
+            writeState.pendingWrite = future;
+            writeState.writeInProgress = true;
+            writeState.waitingOnIo = false;
         }
         doWriteIfPossible(false);
         return future;
     }
 
     @Override
     protected void preClose() {
-        if (!(packetWriter instanceof Channel)) {
-            try {
-                packetWriter.close();
-            } catch (IOException e) {
-                error("preClose({}) Failed ({}) to pre-close packet writer: 
{}",
-                        this, e.getClass().getSimpleName(), e.getMessage(), e);
+        synchronized (writeState) {
+            writeState.openState = state.get();
+        }
+        super.preClose();
+    }
+
+    @Override
+    protected void doCloseImmediately() {
+        try {
+            // Can't close this in preClose(); a graceful close waits for the 
currently pending write to finish and thus
+            // still needs the packet writer.
+            if (!(packetWriter instanceof Channel)) {
+                try {
+                    packetWriter.close();
+                } catch (IOException e) {
+                    error("preClose({}) Failed ({}) to pre-close packet 
writer: {}",
+                            this, e.getClass().getSimpleName(), 
e.getMessage(), e);
+                }
             }
+            super.doCloseImmediately();
+        } finally {
+            shutdown();
         }
+    }
 
-        super.preClose();
+    private void shutdown() {

Review Comment:
   Let's make this *protected* in the spirit of allowing users to overload 
anything they want in our code.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -50,12 +64,15 @@ public class ChannelOutputStream extends OutputStream 
implements java.nio.channe
     private final Duration maxWaitTimeout;
     private final byte cmd;
     private final boolean eofOnClose;
-    private final byte[] b = new byte[1];
-    private final AtomicBoolean closedState = new AtomicBoolean(false);
+    private final AtomicReference<OpenState> openState = new 
AtomicReference<>(OpenState.OPEN);
+
+    private final Object bufferLock = new Object();
     private Buffer buffer;
     private int bufferLength;
     private int lastSize;
-    private boolean noDelay;
+    private boolean isFlushing;
+
+    private volatile boolean noDelay;

Review Comment:
   Personally I am not a big fan of *volatile* as means  of multi-threading 
safety - I prefer *AtomicBoolean*-s - but I defer to your judgement here if you 
prefer *volatile*



##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java:
##########
@@ -712,6 +762,17 @@ protected boolean handleServiceRequest(String serviceName, 
Buffer buffer) throws
         return true;
     }
 
+    private boolean validateServiceKexState(KexState state) {

Review Comment:
   Let's make this *protected* in the spirit of letting our users override 
whatever they like.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java:
##########
@@ -42,6 +43,19 @@
  * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
  */
 public class ChannelOutputStream extends OutputStream implements 
java.nio.channels.Channel, ChannelHolder {
+
+    private enum WriteState {
+        BUFFERED,
+        NEED_FLUSH,
+        NEED_SPACE
+    }
+
+    private enum OpenState {

Review Comment:
   Let's make this *protected* (or even *public*) in the spirit of allowing 
users to overload anything they want in our code.



##########
sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java:
##########
@@ -282,4 +435,43 @@ public boolean 
isSendChunkIfRemoteWindowIsSmallerThanPacketSize() {
     public void setSendChunkIfRemoteWindowIsSmallerThanPacketSize(boolean 
sendChunkIfRemoteWindowIsSmallerThanPacketSize) {
         this.sendChunkIfRemoteWindowIsSmallerThanPacketSize = 
sendChunkIfRemoteWindowIsSmallerThanPacketSize;
     }
+
+    // Marker type to avoid repeated buffering
+    private static class BufferedFuture extends IoWriteFutureImpl {

Review Comment:
   Let's make this *protected* (as well as its constructor and methods) in the 
spirit of allowing users to overload anything they want in our code.



##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that 
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages 
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send 
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the 
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key 
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7";>RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+    // With asynchronous flushing we get a classic producer-consumer problem. 
The flushing thread is the single
+    // consumer, and there is a risk that it might get overrun by the 
producers. The classical solution of using a
+    // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we 
cannot make the producers block when the queue
+    // is full; we might deadlock or be unable to handle any incoming message.
+    //
+    // We need an unbounded queue that never blocks the producers, but that 
manages to throttle them such that the
+    // flushing thread can actually finish, and we still can handle incoming 
messages (in particular also the peer's
+    // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our 
own SSH_MSG_NEW_KEYS).
+    //
+    // This is achieved by giving the flushing thread priority over the 
threads that might enqueue additional packets
+    // and flushing at least two packets at a time. Additionally the flush 
loop releases and shortly afterwards
+    // re-acquires the write lock, so normally not many readers (i.e., 
writePacket() calls) will get a chance to enqueue
+    // new packets.
+
+    /**
+     * We need the flushing thread to have priority over writing threads. So 
we use a lock that favors writers over
+     * readers, and any state updates and the flushing thread are writers, 
while writePacket() is a reader.
+     */
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(false);
+
+    private final ExecutorService flushRunner = 
Executors.newSingleThreadExecutor();
+
+    private final AbstractSession session;
+
+    private final Logger log;
+
+    /**
+     * Queues up high-level packets written during an ongoing key exchange.
+     */
+    private final Queue<PendingWriteFuture> pendingPackets = new 
LinkedList<>();
+
+    /**
+     * Indicates that all pending packets have been flushed.
+     */
+    private boolean kexFlushed = true;
+
+    /**
+     * Never {@code null}. Used to block some threads when writing packets 
while pending packets are still being flushed
+     * at the end of a KEX to avoid overrunning the flushing thread. Always 
set, initially fulfilled. At the beginning
+     * of a KEX a new future is installed, which is fulfilled at the end of 
the KEX once there are no more pending
+     * packets to be flushed.
+     */
+    private DefaultKeyExchangeFuture kexFlushedFuture;
+
+    /**
+     * Creates a new {@link KeyExchangeMessageHandler} for the given {@code 
session}, using the given {@code Logger}.
+     *
+     * @param session {@link AbstractSession} the new instance belongs to
+     * @param log     {@link Logger} to use for writing log messages
+     */
+    public KeyExchangeMessageHandler(AbstractSession session, Logger log) {
+        this.session = Objects.requireNonNull(session);
+        this.log = Objects.requireNonNull(log);
+        // Start with a fulfilled kexFlushed future.
+        kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(), 
session.getFutureLock());
+        kexFlushedFuture.setValue(Boolean.TRUE);
+    }
+
+    public void updateState(Runnable update) {
+        lock.writeLock().lock();
+        try {
+            update.run();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public <V> V updateState(Supplier<V> update) {
+        lock.writeLock().lock();
+        try {
+            return update.get();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Initializes the state for a new key exchange. {@link 
#allPacketsFlushed()} will be {@code false}, and a new
+     * future to be fulfilled when all queued packets will be flushed once the 
key exchange is done is set. The
+     * currently set future from an earlier key exchange is returned. The 
returned future may or may not be fulfilled;
+     * if it isn't, there are still left-over pending packets to write from 
the previous key exchange, which will be
+     * written once the new key exchange flushes pending packets.
+     *
+     * @return the previous {@link DefaultKeyExchangeFuture} indicating 
whether all pending packets were flushed.
+     */
+    public DefaultKeyExchangeFuture initNewKeyExchange() {
+        return updateState(() -> {
+            kexFlushed = false;
+            DefaultKeyExchangeFuture oldFuture = kexFlushedFuture;
+            kexFlushedFuture = new 
DefaultKeyExchangeFuture(session.toString(), session.getFutureLock());
+            return oldFuture;
+        });
+    }
+
+    /**
+     * To be called when the key exchange is done. If there are any pending 
packets, returns a future that will be
+     * fulfilled when {@link #flushQueue(DefaultKeyExchangeFuture)} with that 
future as argument has flushed all pending
+     * packets, if there are any.
+     *
+     * @return the current {@link DefaultKeyExchangeFuture} and the number of 
currently pending packets
+     */
+    public SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> 
terminateKeyExchange() {
+        return updateState(() -> {
+            int numPending = pendingPackets.size();
+            if (numPending == 0) {
+                kexFlushed = true;
+            }
+            return new SimpleImmutableEntry<Integer, 
DefaultKeyExchangeFuture>(Integer.valueOf(numPending), kexFlushedFuture);
+        });
+    }
+
+    /**
+     * Pretends all pending packets had been written. To be called when the 
{@link AbstractSession} closes.
+     */
+    public void shutdown() {
+        SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> items = 
updateState(() -> {
+            kexFlushed = true;
+            return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>(
+                    Integer.valueOf(pendingPackets.size()),
+                    kexFlushedFuture);
+        });
+        items.getValue().setValue(Boolean.valueOf(items.getKey().intValue() == 
0));
+        flushRunner.shutdownNow();
+    }
+
+    /**
+     * Writes a packet. If a key exchange is ongoing, only low-level messages 
are written directly; all other messages
+     * are queued and will be written once {@link 
#flushQueue(DefaultKeyExchangeFuture)} is called when the key exchange
+     * is done. Packets written while there are still pending packets to be 
flushed will either be queued, too, or the
+     * calling thread will be blocked with the given timeout until all packets 
have been flushed. Whether a write will
+     * be blocked is determined by {@link #isBlockAllowed(int)}.
+     * <p>
+     * If {@code timeout <= 0} or {@code unit == null}, a time-out of 
"forever" is assumed. Note that a timeout applies
+     * only if the calling thread is blocked.
+     * </p>
+     *
+     * @param  buffer      packet to write
+     * @param  timeout     number of {@link TimeUnit}s to wait at most if the 
calling thread is blocked
+     * @param  unit        {@link TimeUnit} of {@code timeout}
+     * @return             an {@link IoWriteFuture} that will be fulfilled 
once the packet has indeed been written.
+     * @throws IOException if an error occurs
+     */
+    public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit 
unit) throws IOException {
+        // While exchanging key, queue high level packets.
+        byte[] bufData = buffer.array();
+        int cmd = bufData[buffer.rpos()] & 0xFF;
+        boolean enqueued = false;
+        boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST && 
cmd != SshConstants.SSH_MSG_SERVICE_REQUEST
+                && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT;
+        try {
+            if (isLowLevelMessage) {
+                // Low-level messages can always be sent.
+                return session.doWritePacket(buffer);
+            }
+            IoWriteFuture future = writeOrEnqueue(cmd, buffer, timeout, unit);
+            enqueued = future instanceof PendingWriteFuture;
+            return future;
+        } finally {
+            session.resetIdleTimeout();
+            if (!enqueued) {
+                try {
+                    session.checkRekey();
+                } catch (GeneralSecurityException e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("writePacket({}) failed ({}) to check 
re-key: {}", session, e.getClass().getSimpleName(),
+                                e.getMessage(), e);
+                    }
+                    throw ValidateUtils.initializeExceptionCause(new 
ProtocolException(
+                            "Failed (" + e.getClass().getSimpleName() + ")" + 
" to check re-key necessity: " + e.getMessage()),
+                            e);
+                } catch (Exception e) {
+                    ExceptionUtils.rethrowAsIoException(e);
+                }
+            }
+        }
+    }
+
+    private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer, long timeout, 
TimeUnit unit) throws IOException {

Review Comment:
   Let's make this *protected* in the spirit of letting our users override 
whatever they like.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 761480)
    Time Spent: 3h 50m  (was: 3h 40m)

> Deadlock on disconnection at the end of key-exchange
> ----------------------------------------------------
>
>                 Key: SSHD-966
>                 URL: https://issues.apache.org/jira/browse/SSHD-966
>             Project: MINA SSHD
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Francois Ferrand
>            Assignee: Lyor Goldstein
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> We are using git-repo to download projects from Gerrit server, using SSH.
> Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2 
> backend.
> One particularity of this setup is that git-repo creates a single control 
> master channel, and then downloads *lots* of Git repositories (500 
> repositories, some of them relatively large), with some degree of 
> parallelism. This takes a long time, lots of data, and the multiplexed 
> connections are handled by gerrit in multiple threads.
> In some cases, we experience a deadlock when an error happens at the end of 
> the key exchange, while sending pending packets:
> {noformat}
> Warning, the following threads are deadlocked : SSH git-upload-pack /project1 
> (myuser), sshd-SshServer[df5f657]-nio2-thread-3
> "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED
>       
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>       
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>       
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
>       org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
>       org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
>       com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>       
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>       
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>       
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
>       com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>       
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
>       java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>       
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       java.lang.Thread.run(Thread.java:748)
> "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED
>       
> org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249)
>       org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151)
>       
> org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205)
>       
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>       
> org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576)
>       
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>       
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>       
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>       
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>       
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>       
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>       
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56)
>       
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45)
>       
> org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69)
>       
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>       
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>       
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973)
>       
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460)
>       
> org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown
>  Source)
>       java.security.AccessController.doPrivileged(Native Method)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45)
>       sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
>       sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
>       
> sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736)
>       
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382)
>       
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527)
>       
> org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516)
>       
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63)
>       
> org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339)
>       
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318)
>       
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown
>  Source)
>       java.security.AccessController.doPrivileged(Native Method)
>       
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37)
>       sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
>       sun.nio.ch.Invoker$2.run(Invoker.java:218)
>       
> sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
>       
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       java.lang.Thread.run(Thread.java:748)
> "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED
>       
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>       
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>       
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>       
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>       
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>       
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>       
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>       
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>       
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
>       org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
>       org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
>       org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
>       com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>       
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>       
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>       
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
>       com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>       
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
>       java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>       
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       java.lang.Thread.run(Thread.java:748)
> {noformat}
> In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}}, 
> while trying to send these packets; when sending the packets fails, an 
> exception is generated, which causes the channel to get closed by calling 
> ChannelOutputStream.close(), which is a synchronized method.
> At the same time, some other threads are trying to write data to the session: 
> it calls ChannelOutputStream.write(), which is also a synchronized method, 
> which calls AbstractSession.writePacket() which attempts to lock 
> {{pendingPackets}} to queue the packets.
> *NOTE*: in our setup, we can reproduce this quite easily by increasing the 
> parallelism in git-repo, reducing the values of waitTimeout 
> (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and 
> rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to