Repository: mina-sshd
Updated Branches:
  refs/heads/master 56cc5356f -> 88c0c819d


[SSHD-580] getInvertedIn.close() does not close stdin of remote program


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/88c0c819
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/88c0c819
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/88c0c819

Branch: refs/heads/master
Commit: 88c0c819dab1d1c1c59240433135562c3ca1f5a7
Parents: 56cc535
Author: Lyor Goldstein <[email protected]>
Authored: Thu Jan 21 13:50:31 2016 +0200
Committer: Lyor Goldstein <[email protected]>
Committed: Thu Jan 21 13:50:31 2016 +0200

----------------------------------------------------------------------
 .../sshd/agent/local/AgentForwardedChannel.java |  7 +-
 .../agent/local/ChannelAgentForwarding.java     | 13 ++-
 .../sshd/agent/unix/AgentForwardedChannel.java  |  7 +-
 .../sshd/agent/unix/ChannelAgentForwarding.java | 14 ++-
 .../client/channel/AbstractClientChannel.java   |  6 +-
 .../sshd/client/channel/ChannelDirectTcpip.java |  2 +-
 .../sshd/client/channel/ChannelSession.java     |  4 +-
 .../sshd/common/channel/AbstractChannel.java    | 99 ++++++++++++++++----
 .../org/apache/sshd/common/channel/Channel.java |  6 ++
 .../sshd/common/channel/ChannelListener.java    |  4 +-
 .../common/channel/ChannelOutputStream.java     | 15 ++-
 .../common/channel/OpenChannelException.java    | 53 +++++++++++
 .../sshd/common/forward/TcpipClientChannel.java |  2 +-
 .../session/AbstractConnectionService.java      | 44 +++++++--
 .../server/channel/AbstractServerChannel.java   |  3 +-
 .../sshd/server/channel/ChannelSession.java     |  8 +-
 .../server/channel/OpenChannelException.java    | 53 -----------
 .../sshd/server/forward/TcpipServerChannel.java | 18 ++--
 .../sshd/server/x11/X11ForwardSupport.java      |  2 +-
 .../client/ClientAuthenticationManagerTest.java |  2 +-
 .../java/org/apache/sshd/client/ClientTest.java | 46 ++++++---
 .../common/channel/TestChannelListener.java     |  2 +-
 22 files changed, 270 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index 8bc292b..377798e 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -30,6 +30,7 @@ import org.apache.sshd.agent.common.AbstractAgentProxy;
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 
@@ -85,10 +86,8 @@ public class AgentForwardedChannel extends 
AbstractClientChannel {
 
     @Override
     protected void doOpen() throws IOException {
-        if (streaming == Streaming.Async) {
-            throw new IllegalArgumentException("Asynchronous streaming isn't 
supported yet on this channel");
-        }
-        invertedIn = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+        ValidateUtils.checkTrue(!Streaming.Async.equals(streaming), 
"Asynchronous streaming isn't supported yet on this channel");
+        invertedIn = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index 929361f..cf0f707 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -56,7 +56,7 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
         final OpenFuture f = new DefaultOpenFuture(this);
         ChannelListener listener = getChannelListenerProxy();
         try {
-            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
 
             Session session = getSession();
             FactoryManager manager = 
ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager");
@@ -70,9 +70,13 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
             Throwable e = GenericUtils.peelException(t);
             try {
                 listener.channelOpenFailure(this, e);
-            } catch (Throwable ignored) {
+            } catch (Throwable err) {
+                Throwable ignored = GenericUtils.peelException(err);
                 log.warn("doInit({}) failed ({}) to inform listener of open 
failure={}: {}",
                          this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
+                if (log.isDebugEnabled()) {
+                    log.debug("doInit(" + this + ") inform listener open 
failure details", ignored);
+                }
             }
             f.setException(e);
         }
@@ -100,11 +104,6 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
     }
 
     @Override
-    public void handleEof() throws IOException {
-        super.handleEof();
-    }
-
-    @Override
     protected void doWriteData(byte[] data, int off, int len) throws 
IOException {
         client.messageReceived(new ByteArrayBuffer(data, off, len));
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 8ed5c07..4cf3ad8 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.sshd.client.channel.AbstractClientChannel;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.tomcat.jni.Socket;
 import org.apache.tomcat.jni.Status;
 
@@ -58,10 +59,8 @@ public class AgentForwardedChannel extends 
AbstractClientChannel implements Runn
 
     @Override
     protected synchronized void doOpen() throws IOException {
-        if (streaming == Streaming.Async) {
-            throw new IllegalArgumentException("Asynchronous streaming isn't 
supported yet on this channel");
-        }
-        invertedIn = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+        ValidateUtils.checkTrue(!Streaming.Async.equals(streaming), 
"Asynchronous streaming isn't supported yet on this channel");
+        invertedIn = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index ab83c1f..3093321 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -77,7 +77,7 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
         final OpenFuture f = new DefaultOpenFuture(this);
         ChannelListener listener = getChannelListenerProxy();
         try {
-            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
             authSocket = PropertyResolverUtils.getString(this, 
SshAgent.SSH_AUTHSOCKET_ENV_NAME);
             pool = Pool.create(AprLibrary.getInstance().getRootPool());
             handle = Local.create(authSocket, pool);
@@ -119,9 +119,13 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
             Throwable e = GenericUtils.peelException(t);
             try {
                 listener.channelOpenFailure(this, e);
-            } catch (Throwable ignored) {
+            } catch (Throwable err) {
+                Throwable ignored = GenericUtils.peelException(err);
                 log.warn("doInit({}) failed ({}) to inform listener of open 
failure={}: {}",
                          this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
+                if (log.isDebugEnabled()) {
+                    log.debug("doInit(" + this + ") inform listener open 
failure details", ignored);
+                }
             }
             f.setException(e);
         }
@@ -173,12 +177,6 @@ public class ChannelAgentForwarding extends 
AbstractServerChannel {
     }
 
     @Override
-    public void handleEof() throws IOException {
-        super.handleEof();
-//        close(true);
-    }
-
-    @Override
     protected void doWriteData(byte[] data, int off, int len) throws 
IOException {
         int result = Socket.send(handle, data, off, len);
         if (result < Status.APR_SUCCESS) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 9ae646f..5673280 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -320,9 +320,13 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
             Throwable e = GenericUtils.peelException(t);
             try {
                 listener.channelOpenFailure(this, e);
-            } catch (Throwable ignored) {
+            } catch (Throwable err) {
+                Throwable ignored = GenericUtils.peelException(err);
                 log.warn("handleOpenSuccess({}) failed ({}) to inform listener 
of open failure={}: {}",
                          this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
+                if (log.isDebugEnabled()) {
+                    log.debug("handleOpenSuccess(" + this + ") inform listener 
open failure details", ignored);
+                }
             }
 
             this.openFuture.setException(e);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
index 945d431..81f733b 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
@@ -96,7 +96,7 @@ public class ChannelDirectTcpip extends AbstractClientChannel 
{
             asyncIn = new ChannelAsyncOutputStream(this, 
SshConstants.SSH_MSG_CHANNEL_DATA);
             asyncOut = new ChannelAsyncInputStream(this);
         } else {
-            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
             invertedIn = out;
             ChannelPipedInputStream pis = new ChannelPipedInputStream(this, 
localWindow);
             pipe = new ChannelPipedOutputStream(pis);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index 91fe1e3..aa50f3e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -52,7 +52,7 @@ public class ChannelSession extends AbstractClientChannel {
 
     @Override
     protected void doOpen() throws IOException {
-        if (streaming == Streaming.Async) {
+        if (Streaming.Async.equals(streaming)) {
             asyncIn = new ChannelAsyncOutputStream(this, 
SshConstants.SSH_MSG_CHANNEL_DATA) {
                 @SuppressWarnings("synthetic-access")
                 @Override
@@ -69,7 +69,7 @@ public class ChannelSession extends AbstractClientChannel {
             asyncOut = new ChannelAsyncInputStream(this);
             asyncErr = new ChannelAsyncInputStream(this);
         } else {
-            invertedIn = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+            invertedIn = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
             if (out == null) {
                 ChannelPipedInputStream pis = new 
ChannelPipedInputStream(this, localWindow);
                 ChannelPipedOutputStream pos = new 
ChannelPipedOutputStream(pis);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 51eae39..62caf9f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -77,7 +77,9 @@ public abstract class AbstractChannel
     protected final Window localWindow;
     protected final Window remoteWindow;
     protected ConnectionService service;
-    protected final AtomicBoolean eof = new AtomicBoolean(false);
+    protected final AtomicBoolean initialized = new AtomicBoolean(false);
+    protected final AtomicBoolean eofReceived = new AtomicBoolean(false);
+    protected final AtomicBoolean eofSent = new AtomicBoolean(false);
     protected AtomicReference<GracefulState> gracefulState = new 
AtomicReference<GracefulState>(GracefulState.Opened);
     protected final DefaultCloseFuture gracefulFuture = new 
DefaultCloseFuture(lock);
     protected final List<RequestHandler<Channel>> handlers = new 
ArrayList<RequestHandler<Channel>>();
@@ -327,11 +329,17 @@ public abstract class AbstractChannel
             listener.channelInitialized(this);
         } catch (RuntimeException t) {
             Throwable e = GenericUtils.peelException(t);
-            throw new IOException("Failed (" + e.getClass().getSimpleName() + 
") to notify channel " + toString() + " initialization: " + e.getMessage(), e);
+            throw new IOException("Failed (" + e.getClass().getSimpleName() + 
") to notify channel " + this + " initialization: " + e.getMessage(), e);
         }
         // delegate the rest of the notifications to the channel
         addChannelListener(listener);
         configureWindow();
+        initialized.set(true);
+    }
+
+    @Override
+    public boolean isInitialized() {
+        return initialized.get();
     }
 
     protected void notifyStateChanged() {
@@ -373,8 +381,15 @@ public abstract class AbstractChannel
     @Override
     public void handleClose() throws IOException {
         if (log.isDebugEnabled()) {
-            log.debug("handleClose({}) SSH_MSG_CHANNEL_CLOSE on channel", 
this);
+            log.debug("handleClose({}) SSH_MSG_CHANNEL_CLOSE", this);
+        }
+
+        if (!eofSent.getAndSet(true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("handleClose({}) prevent sending EOF", this);
+            }
         }
+
         if (gracefulState.compareAndSet(GracefulState.Opened, 
GracefulState.CloseReceived)) {
             close(false);
         } else if (gracefulState.compareAndSet(GracefulState.CloseSent, 
GracefulState.Closed)) {
@@ -383,12 +398,22 @@ public abstract class AbstractChannel
     }
 
     @Override
+    public CloseFuture close(boolean immediately) {
+        if (!eofSent.getAndSet(true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("close({}) prevent sending EOF", this);
+            }
+        }
+
+        return super.close(immediately);
+    }
+
+    @Override
     protected Closeable getInnerCloseable() {
         return new GracefulChannelCloseable();
     }
 
     public class GracefulChannelCloseable extends IoBaseCloseable {
-
         private final AtomicBoolean closing = new AtomicBoolean(false);
 
         public GracefulChannelCloseable() {
@@ -413,13 +438,17 @@ public abstract class AbstractChannel
         public CloseFuture close(final boolean immediately) {
             final Channel channel = AbstractChannel.this;
             if (log.isDebugEnabled()) {
-                log.debug("close({})[immediately={}] SSH_MSG_CHANNEL_CLOSE on 
channel", channel, immediately);
+                log.debug("close({})[immediately={}] processing", channel, 
immediately);
             }
 
             setClosing(true);
             if (immediately) {
                 gracefulFuture.setClosed();
             } else if (!gracefulFuture.isClosed()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("close({})[immediately={}] send 
SSH_MSG_CHANNEL_CLOSE", channel, immediately);
+                }
+
                 Session s = getSession();
                 Buffer buffer = 
s.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE, Short.SIZE);
                 buffer.putInt(getRecipient());
@@ -452,6 +481,10 @@ public abstract class AbstractChannel
                         log.debug("close({})[immediately={}] {} while writing 
SSH_MSG_CHANNEL_CLOSE packet on channel: {}",
                                   channel, immediately, 
e.getClass().getSimpleName(), e.getMessage());
                     }
+
+                    if (log.isTraceEnabled()) {
+                        log.trace("close(" + channel + ")[immediately=" + 
immediately + "] packet write failure details", e);
+                    }
                     channel.close(true);
                 }
             }
@@ -473,10 +506,13 @@ public abstract class AbstractChannel
     protected void preClose() {
         ChannelListener listener = getChannelListenerProxy();
         try {
-            listener.channelClosed(this);
+            listener.channelClosed(this, null);
         } catch (RuntimeException t) {
             Throwable e = GenericUtils.peelException(t);
-            log.warn(e.getClass().getSimpleName() + " while signal channel " + 
toString() + " closed: " + e.getMessage(), e);
+            log.warn("preClose({}) {} while signal channel closed: {}", this, 
e.getClass().getSimpleName(), e.getMessage());
+            if (log.isDebugEnabled()) {
+                log.debug("preClose(" + this + ") channel closed signalling 
failure details", e);
+            }
         } finally {
             // clear the listeners since we are closing the channel (quicker 
GC)
             this.channelListeners.clear();
@@ -531,6 +567,10 @@ public abstract class AbstractChannel
         if (log.isTraceEnabled()) {
             log.trace("handleData({}) data: {}", this, 
BufferUtils.printHex(buffer.array(), buffer.rpos(), len));
         }
+        if (isEofSignalled()) {
+            // TODO consider throwing an exception
+            log.warn("handleData({}) extra {} bytes sent after EOF", this, 
len);
+        }
         doWriteData(buffer.array(), buffer.rpos(), len);
     }
 
@@ -558,26 +598,30 @@ public abstract class AbstractChannel
         if (log.isTraceEnabled()) {
             log.trace("handleExtendedData({}) extended data: {}", this, 
BufferUtils.printHex(buffer.array(), buffer.rpos(), len));
         }
+        if (isEofSignalled()) {
+            // TODO consider throwing an exception
+            log.warn("handleExtendedData({}) extra {} bytes sent after EOF", 
this, len);
+        }
         doWriteExtendedData(buffer.array(), buffer.rpos(), len);
     }
 
-    public boolean isEofSignalled() {
-        return eof.get();
-    }
-
-    public void setEofSignalled(boolean on) {
-        eof.set(on);
-    }
-
     @Override
     public void handleEof() throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("handleEof({}) SSH_MSG_CHANNEL_EOF", this);
+        if (eofReceived.getAndSet(true)) {
+            // TODO consider throwing an exception
+            log.warn("handleEof({}) already signalled", this);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("handleEof({}) SSH_MSG_CHANNEL_EOF", this);
+            }
         }
-        setEofSignalled(true);
         notifyStateChanged();
     }
 
+    public boolean isEofSignalled() {
+        return eofReceived.get();
+    }
+
     @Override
     public void handleWindowAdjust(Buffer buffer) throws IOException {
         int window = buffer.getInt();
@@ -607,15 +651,34 @@ public abstract class AbstractChannel
     protected abstract void doWriteExtendedData(byte[] data, int off, int len) 
throws IOException;
 
     protected void sendEof() throws IOException {
+        if (eofSent.getAndSet(true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("sendEof({}) already sent", this);
+            }
+            return;
+        }
+
+        if (isClosing()) {
+            if (log.isDebugEnabled()) {
+                log.debug("sendEof({}) already closing or closed", this);
+            }
+            return;
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("sendEof({}) SSH_MSG_CHANNEL_EOF", this);
         }
+
         Session s = getSession();
         Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF, 
Short.SIZE);
         buffer.putInt(getRecipient());
         writePacket(buffer);
     }
 
+    public boolean isEofSent() {
+        return eofSent.get();
+    }
+
     @Override
     public Map<String, Object> getProperties() {
         return properties;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
index 78be9cc..c045cc3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java
@@ -85,6 +85,12 @@ public interface Channel extends ChannelListenerManager, 
PropertyResolver, Close
     void init(ConnectionService service, Session session, int id) throws 
IOException;
 
     /**
+     * @return {@code true} if call to {@link #init(ConnectionService, 
Session, int)} was
+     * successfully completed
+     */
+    boolean isInitialized();
+
+    /**
      * For a server channel, this method will actually open the channel
      *
      * @param recipient  Recipient identifier

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
index 4a4078c..9c1d2d4 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java
@@ -69,6 +69,8 @@ public interface ChannelListener extends EventListener {
      * {@link #channelOpenFailure(Channel, Throwable)} have been called
      *
      * @param channel The referenced {@link Channel}
+     * @param reason The reason why the channel is being closed - if {@code 
null}
+     * then normal closure
      */
-    void channelClosed(Channel channel);
+    void channelClosed(Channel channel, Throwable reason);
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 27fe429..9525bd2 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -50,6 +50,7 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
     private final long maxWaitTimeout;
     private final Logger log;
     private final byte cmd;
+    private final boolean eofOnClose;
     private final byte[] b = new byte[1];
     private final AtomicBoolean closedState = new AtomicBoolean(false);
     private Buffer buffer;
@@ -57,20 +58,25 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
     private int lastSize;
     private boolean noDelay;
 
-    public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
Logger log, byte cmd) {
-        this(channel, remoteWindow, 
PropertyResolverUtils.getLongProperty(channel, WAIT_FOR_SPACE_TIMEOUT, 
DEFAULT_WAIT_FOR_SPACE_TIMEOUT), log, cmd);
+    public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
Logger log, byte cmd, boolean eofOnClose) {
+        this(channel, remoteWindow, 
PropertyResolverUtils.getLongProperty(channel, WAIT_FOR_SPACE_TIMEOUT, 
DEFAULT_WAIT_FOR_SPACE_TIMEOUT), log, cmd, eofOnClose);
     }
 
-    public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
long maxWaitTimeout, Logger log, byte cmd) {
+    public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, 
long maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) {
         this.channel = ValidateUtils.checkNotNull(channel, "No channel");
         this.remoteWindow = ValidateUtils.checkNotNull(remoteWindow, "No 
remote window");
         ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait 
time: %d", maxWaitTimeout);
         this.maxWaitTimeout = maxWaitTimeout;
         this.log = ValidateUtils.checkNotNull(log, "No logger");
         this.cmd = cmd;
+        this.eofOnClose = eofOnClose;
         newBuffer(0);
     }
 
+    public boolean isEofOnClose() {
+        return eofOnClose;
+    }
+
     public void setNoDelay(boolean noDelay) {
         this.noDelay = noDelay;
     }
@@ -200,6 +206,9 @@ public class ChannelOutputStream extends OutputStream 
implements Channel {
 
             try {
                 flush();
+                if (isEofOnClose()) {
+                    channel.sendEof();
+                }
             } finally {
                 closedState.set(true);
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java
new file mode 100644
index 0000000..829858a
--- /dev/null
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+/**
+ * Documents failure of a channel to open as expected.
+ *
+ * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
+ */
+public class OpenChannelException extends Exception {
+    private static final long serialVersionUID = 3861183351970782341L;
+    private final int code;
+
+    public OpenChannelException(int code, String message) {
+        this(code, message, null);
+    }
+
+    public OpenChannelException(int code, String message, Throwable cause) {
+        super(message, cause);
+        this.code = code;
+    }
+
+    /**
+     * The reason code as specified by RFC 4254.
+     * <ul>
+     * <li>{@link 
org.apache.sshd.common.SshConstants#SSH_OPEN_ADMINISTRATIVELY_PROHIBITED}
+     * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_CONNECT_FAILED}
+     * <li>{@link 
org.apache.sshd.common.SshConstants#SSH_OPEN_UNKNOWN_CHANNEL_TYPE}
+     * <li>{@link 
org.apache.sshd.common.SshConstants#SSH_OPEN_RESOURCE_SHORTAGE}
+     * </ul>
+     *
+     * @return reason code; 0 if no standardized reason code is given.
+     */
+    public int getReasonCode() {
+        return code;
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index 44856bd..c531686 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -114,7 +114,7 @@ public class TcpipClientChannel extends 
AbstractClientChannel {
         if (streaming == Streaming.Async) {
             throw new IllegalArgumentException("Asynchronous streaming isn't 
supported yet on this channel");
         }
-        out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+        out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
         invertedIn = out;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
index 34f726a..a1e93a0 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
@@ -36,6 +36,8 @@ import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.OpenChannelException;
 import org.apache.sshd.common.channel.RequestHandler;
 import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.forward.TcpipForwarder;
@@ -47,7 +49,6 @@ import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
-import org.apache.sshd.server.channel.OpenChannelException;
 import org.apache.sshd.server.x11.X11ForwardSupport;
 
 /**
@@ -134,7 +135,7 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession> exten
 
     @Override
     public int registerChannel(Channel channel) throws IOException {
-        final Session session = getSession();
+        Session session = getSession();
         int maxChannels = PropertyResolverUtils.getIntProperty(session, 
MAX_CONCURRENT_CHANNELS_PROP, DEFAULT_MAX_CHANNELS);
         int curSize = channels.size();
         if (curSize > maxChannels) {
@@ -143,12 +144,17 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession> exten
 
         int channelId = getNextChannelId();
         channel.init(this, session, channelId);
+
+        boolean registered = false;
         synchronized (lock) {
-            if (isClosing()) {
-                throw new IllegalStateException("Session is being closed: " + 
toString());
+            if (!isClosing()) {
+                channels.put(channelId, channel);
+                registered = true;
             }
+        }
 
-            channels.put(channelId, channel);
+        if (!registered) {
+            handleChannelRegistrationFailure(channel, channelId);
         }
 
         if (log.isDebugEnabled()) {
@@ -157,6 +163,23 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession> exten
         return channelId;
     }
 
+    protected void handleChannelRegistrationFailure(Channel channel, int 
channelId) throws IOException {
+        RuntimeException reason = new IllegalStateException("Channel id=" + 
channelId + " not registered because session is being closed: " + this);
+        ChannelListener listener = channel.getChannelListenerProxy();
+        try {
+            listener.channelClosed(channel, reason);
+        } catch (Throwable err) {
+            Throwable ignored = GenericUtils.peelException(err);
+            log.warn("registerChannel({})[{}] failed ({}) to inform of channel 
closure: {}",
+                     this, channel, ignored.getClass().getSimpleName(), 
ignored.getMessage());
+            if (log.isDebugEnabled()) {
+                log.debug("registerChannel(" + this + ")[" + channel + "] 
inform closure failure details", ignored);
+            }
+        }
+
+        throw reason;
+    }
+
     /**
      * Remove this channel from the list of managed channels
      *
@@ -365,13 +388,18 @@ public abstract class AbstractConnectionService<S extends 
AbstractSession> exten
      * @throws IOException if the channel does not exists
      */
     protected Channel getChannel(Buffer buffer) throws IOException {
-        int recipient = buffer.getInt();
+        return getChannel(buffer.getInt(), buffer);
+    }
+
+    protected Channel getChannel(int recipient, Buffer buffer) throws 
IOException {
         Channel channel = channels.get(recipient);
         if (channel == null) {
-            buffer.rpos(buffer.rpos() - 5);
-            int cmd = buffer.getUByte();
+            byte[] data = buffer.array();
+            int curPos = buffer.rpos();
+            int cmd = (curPos >= 5) ? data[curPos - 5] & 0xFF : -1;
             throw new SshException("Received " + 
SshConstants.getCommandMessageName(cmd) + " on unknown channel " + recipient);
         }
+
         return channel;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
index 1e4e140..9765091 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java
@@ -85,7 +85,8 @@ public abstract class AbstractServerChannel extends 
AbstractChannel implements S
             Throwable e = GenericUtils.peelException(t);
             try {
                 listener.channelOpenFailure(this, e);
-            } catch (Throwable ignored) {
+            } catch (Throwable err) {
+                Throwable ignored = GenericUtils.peelException(err);
                 log.warn("doInit({}) failed ({}) to inform listener of open 
failure={}: {}",
                          this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
                 if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index ca8ba28..1df3dff 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -203,6 +203,10 @@ public class ChannelSession extends AbstractServerChannel {
                 log.debug("handleEof({}) failed ({}) to close receiver: {}",
                           this, e.getClass().getSimpleName(), e.getMessage());
             }
+
+            if (log.isTraceEnabled()) {
+                log.trace("handleEof(" + this + ") receiver close failure 
details", e);
+            }
         }
     }
 
@@ -515,8 +519,8 @@ public class ChannelSession extends AbstractServerChannel {
             ((AsyncCommand) command).setIoOutputStream(asyncOut);
             ((AsyncCommand) command).setIoErrorStream(asyncErr);
         } else {
-            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
-            err = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, false);
+            err = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, false);
             if (log.isTraceEnabled()) {
                 // Wrap in logging filters
                 String channelId = toString();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java
deleted file mode 100644
index 6a606ca..0000000
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sshd.server.channel;
-
-/**
- * Documents failure of a channel to open as expected.
- *
- * @author <a href="mailto:[email protected]";>Apache MINA SSHD Project</a>
- */
-public class OpenChannelException extends Exception {
-    private static final long serialVersionUID = 3861183351970782341L;
-    private final int code;
-
-    public OpenChannelException(int code, String message) {
-        this(code, message, null);
-    }
-
-    public OpenChannelException(int code, String message, Throwable cause) {
-        super(message, cause);
-        this.code = code;
-    }
-
-    /**
-     * The reason code as specified by RFC 4254.
-     * <ul>
-     * <li>{@link 
org.apache.sshd.common.SshConstants#SSH_OPEN_ADMINISTRATIVELY_PROHIBITED}
-     * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_CONNECT_FAILED}
-     * <li>{@link 
org.apache.sshd.common.SshConstants#SSH_OPEN_UNKNOWN_CHANNEL_TYPE}
-     * <li>{@link 
org.apache.sshd.common.SshConstants#SSH_OPEN_RESOURCE_SHORTAGE}
-     * </ul>
-     *
-     * @return reason code; 0 if no standardized reason code is given.
-     */
-    public int getReasonCode() {
-        return code;
-    }
-}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index ced89cb..acd5410 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -32,6 +32,7 @@ import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.channel.ChannelFactory;
 import org.apache.sshd.common.channel.ChannelListener;
 import org.apache.sshd.common.channel.ChannelOutputStream;
+import org.apache.sshd.common.channel.OpenChannelException;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoConnectFuture;
@@ -48,7 +49,6 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
-import org.apache.sshd.server.channel.OpenChannelException;
 
 /**
  * TODO Add javadoc
@@ -142,7 +142,7 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
         }
 
         // TODO: revisit for better threading. Use async io ?
-        out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+        out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
         IoHandler handler = new IoHandler() {
             @SuppressWarnings("synthetic-access")
             @Override
@@ -202,7 +202,8 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
             Throwable e = GenericUtils.peelException(t);
             try {
                 listener.channelOpenFailure(this, e);
-            } catch (Throwable ignored) {
+            } catch (Throwable err) {
+                Throwable ignored = GenericUtils.peelException(err);
                 log.warn("handleChannelConnectResult({})[exception] failed 
({}) to inform listener of open failure={}: {}",
                          this, ignored.getClass().getSimpleName(), 
e.getClass().getSimpleName(), ignored.getMessage());
                 if (log.isDebugEnabled()) {
@@ -225,18 +226,19 @@ public class TcpipServerChannel extends 
AbstractServerChannel {
         ChannelListener listener = getChannelListenerProxy();
         try {
             listener.channelOpenFailure(this, problem);
-        } catch (Throwable ignored) {
+        } catch (Throwable err) {
+            Throwable ignored = GenericUtils.peelException(err);
             log.warn("handleChannelOpenFailure({}) failed ({}) to inform 
listener of open failure={}: {}",
                      this, ignored.getClass().getSimpleName(), 
problem.getClass().getSimpleName(), ignored.getMessage());
+            if (log.isDebugEnabled()) {
+                log.debug("handleChannelOpenFailure(" + this + ") listener 
inform open failure details", ignored);
+            }
         }
 
         closeImmediately0();
 
         if (problem instanceof ConnectException) {
-            f.setException(new OpenChannelException(
-                    SshConstants.SSH_OPEN_CONNECT_FAILED,
-                    problem.getMessage(),
-                    problem));
+            f.setException(new 
OpenChannelException(SshConstants.SSH_OPEN_CONNECT_FAILED, 
problem.getMessage(), problem));
         } else {
             f.setException(problem);
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java 
b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index 6edc918..b6eebad 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -222,7 +222,7 @@ public class X11ForwardSupport extends 
AbstractInnerCloseable implements IoHandl
             if (streaming == Streaming.Async) {
                 throw new IllegalArgumentException("Asynchronous streaming 
isn't supported yet on this channel");
             }
-            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA);
+            out = new ChannelOutputStream(this, remoteWindow, log, 
SshConstants.SSH_MSG_CHANNEL_DATA, true);
             invertedIn = out;
         }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
index b2148f2..a66aca8 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java
@@ -169,7 +169,7 @@ public class ClientAuthenticationManagerTest extends 
BaseTestSupport {
             }
 
             @Override
-            public void channelClosed(Channel channel) {
+            public void channelClosed(Channel channel, Throwable reason) {
                 // ignored
             }
         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java 
b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index 2978e17..1572978 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -275,7 +275,7 @@ public class ClientTest extends BaseTestSupport {
             }
 
             @Override
-            public void channelClosed(Channel channel) {
+            public void channelClosed(Channel channel, Throwable reason) {
                 updateChannelConfigProperty(channel, "channelClosed");
             }
 
@@ -315,6 +315,11 @@ public class ClientTest extends BaseTestSupport {
         final Logger log = LoggerFactory.getLogger(getClass());
         client.addChannelListener(new ChannelListener() {
             @Override
+            public void channelInitialized(Channel channel) {
+                handleChannelEvent("Initialized", channel);
+            }
+
+            @Override
             public void channelOpenSuccess(Channel channel) {
                 handleChannelEvent("OpenSuccess", channel);
             }
@@ -330,13 +335,8 @@ public class ClientTest extends BaseTestSupport {
             }
 
             @Override
-            public void channelInitialized(Channel channel) {
-                handleChannelEvent("Initialized", channel);
-            }
-
-            @Override
-            public void channelClosed(Channel channel) {
-                handleChannelEvent("Closed", channel);
+            public void channelClosed(Channel channel, Throwable reason) {
+                log.info("channelClosed(" + channel + ") reason=" + reason);
             }
 
             private void handleChannelEvent(String name, Channel channel) {
@@ -347,7 +347,7 @@ public class ClientTest extends BaseTestSupport {
                     }
                 }
 
-                log.info("handleChannelEvent({})[{}]", name, id);
+                log.info("handleChannelEvent({})[{}] id={}", channel, name, 
id);
                 throw new ChannelFailureException(name);
             }
         });
@@ -359,29 +359,44 @@ public class ClientTest extends BaseTestSupport {
              InputStream inPipe = new PipedInputStream(pipedIn);
              ByteArrayOutputStream out = new ByteArrayOutputStream();
              ByteArrayOutputStream err = new ByteArrayOutputStream()) {
-            // we expect failures either on channel init or open - the one on 
close is ignored...
+            // we expect failures either on channel init or open
             for (int retryCount = 0; retryCount <= 3; retryCount++) {
                 try {
+                    out.reset();
+                    err.reset();
+
                     try(ChannelShell channel = session.createShellChannel()) {
                         channel.setIn(inPipe);
                         channel.setOut(out);
                         channel.setErr(err);
-                        channel.open().verify(6L, TimeUnit.SECONDS);
+                        channel.open().verify(11L, TimeUnit.SECONDS);
+
+                        log.info("Channel established at retry#" + retryCount);
+                        try (OutputStream stdin = channel.getInvertedIn()) {
+                            stdin.write((getCurrentTestName() + "-retry#" + 
retryCount + "\n").getBytes(StandardCharsets.UTF_8));
+                        }
                         break;  // 1st success means all methods have been 
invoked
                     }
                 } catch (IOException e) {
+                    outputDebugMessage("%s at retry #%d: %s", 
e.getClass().getSimpleName(), retryCount, e.getMessage());
                     synchronized (eventsMap) {
                         eventsMap.remove("Closed"); // since it is called 
anyway but does not cause an IOException
                         assertTrue("Unexpected failure at retry #" + 
retryCount, eventsMap.size() < 3);
                     }
+                } catch(IllegalStateException e) {
+                    // sometimes due to timing issues we get this problem
+                    assertTrue("Premature exception phase - count=" + 
retryCount, retryCount > 0);
+                    assertTrue("Session not closing", session.isClosing() || 
session.isClosed());
+                    log.warn("Session closing prematurely: " + session);
+                    return;
                 }
             }
         } finally {
             client.stop();
         }
 
-        assertEquals("Mismatched total failures count on test end", 3, 
eventsMap.size());
-        assertEquals("Mismatched open failures count on test end", 1, 
failuresSet.size());
+        assertEquals("Mismatched total failures count on test end", 2, 
eventsMap.size());
+        assertEquals("Mismatched open failures count on test end: " + 
failuresSet, 1, failuresSet.size());
     }
 
     @Test
@@ -404,7 +419,7 @@ public class ClientTest extends BaseTestSupport {
             }
 
             @Override
-            public void channelClosed(Channel channel) {
+            public void channelClosed(Channel channel, Throwable reason) {
                 assertSame("Mismatched closed channel instances", channel, 
channelHolder.getAndSet(null));
             }
         });
@@ -1476,7 +1491,8 @@ public class ClientTest extends BaseTestSupport {
         private final String name;
 
         public ChannelFailureException(String name) {
-            this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No event 
name provided");
+            super(ValidateUtils.checkNotNullAndNotEmpty(name, "No event name 
provided"));
+            this.name = name;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
----------------------------------------------------------------------
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
 
b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
index 153eee9..c3faa17 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java
@@ -94,7 +94,7 @@ public class TestChannelListener extends AbstractLoggingBean 
implements ChannelL
     }
 
     @Override
-    public void channelClosed(Channel channel) {
+    public void channelClosed(Channel channel, Throwable reason) {
         Assert.assertTrue("Unknown closed channel instance: " + channel, 
activeChannels.remove(channel));
         modificationsCounter.release();
         log.info("channelClosed({})", channel);

Reply via email to