Repository: mina-sshd Updated Branches: refs/heads/master 56cc5356f -> 88c0c819d
[SSHD-580] getInvertedIn.close() does not close stdin of remote program Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/88c0c819 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/88c0c819 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/88c0c819 Branch: refs/heads/master Commit: 88c0c819dab1d1c1c59240433135562c3ca1f5a7 Parents: 56cc535 Author: Lyor Goldstein <[email protected]> Authored: Thu Jan 21 13:50:31 2016 +0200 Committer: Lyor Goldstein <[email protected]> Committed: Thu Jan 21 13:50:31 2016 +0200 ---------------------------------------------------------------------- .../sshd/agent/local/AgentForwardedChannel.java | 7 +- .../agent/local/ChannelAgentForwarding.java | 13 ++- .../sshd/agent/unix/AgentForwardedChannel.java | 7 +- .../sshd/agent/unix/ChannelAgentForwarding.java | 14 ++- .../client/channel/AbstractClientChannel.java | 6 +- .../sshd/client/channel/ChannelDirectTcpip.java | 2 +- .../sshd/client/channel/ChannelSession.java | 4 +- .../sshd/common/channel/AbstractChannel.java | 99 ++++++++++++++++---- .../org/apache/sshd/common/channel/Channel.java | 6 ++ .../sshd/common/channel/ChannelListener.java | 4 +- .../common/channel/ChannelOutputStream.java | 15 ++- .../common/channel/OpenChannelException.java | 53 +++++++++++ .../sshd/common/forward/TcpipClientChannel.java | 2 +- .../session/AbstractConnectionService.java | 44 +++++++-- .../server/channel/AbstractServerChannel.java | 3 +- .../sshd/server/channel/ChannelSession.java | 8 +- .../server/channel/OpenChannelException.java | 53 ----------- .../sshd/server/forward/TcpipServerChannel.java | 18 ++-- .../sshd/server/x11/X11ForwardSupport.java | 2 +- .../client/ClientAuthenticationManagerTest.java | 2 +- .../java/org/apache/sshd/client/ClientTest.java | 46 ++++++--- .../common/channel/TestChannelListener.java | 2 +- 22 files changed, 270 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java index 8bc292b..377798e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java @@ -30,6 +30,7 @@ import org.apache.sshd.agent.common.AbstractAgentProxy; import org.apache.sshd.client.channel.AbstractClientChannel; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; @@ -85,10 +86,8 @@ public class AgentForwardedChannel extends AbstractClientChannel { @Override protected void doOpen() throws IOException { - if (streaming == Streaming.Async) { - throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); - } - invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + ValidateUtils.checkTrue(!Streaming.Async.equals(streaming), "Asynchronous streaming isn't supported yet on this channel"); + invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java index 929361f..cf0f707 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java @@ -56,7 +56,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { final OpenFuture f = new DefaultOpenFuture(this); ChannelListener listener = getChannelListenerProxy(); try { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); Session session = getSession(); FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager"); @@ -70,9 +70,13 @@ public class ChannelAgentForwarding extends AbstractServerChannel { Throwable e = GenericUtils.peelException(t); try { listener.channelOpenFailure(this, e); - } catch (Throwable ignored) { + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}", this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); + if (log.isDebugEnabled()) { + log.debug("doInit(" + this + ") inform listener open failure details", ignored); + } } f.setException(e); } @@ -100,11 +104,6 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } @Override - public void handleEof() throws IOException { - super.handleEof(); - } - - @Override protected void doWriteData(byte[] data, int off, int len) throws IOException { client.messageReceived(new ByteArrayBuffer(data, off, len)); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java index 8ed5c07..4cf3ad8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.sshd.client.channel.AbstractClientChannel; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.util.ValidateUtils; import org.apache.tomcat.jni.Socket; import org.apache.tomcat.jni.Status; @@ -58,10 +59,8 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn @Override protected synchronized void doOpen() throws IOException { - if (streaming == Streaming.Async) { - throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); - } - invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + ValidateUtils.checkTrue(!Streaming.Async.equals(streaming), "Asynchronous streaming isn't supported yet on this channel"); + invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java index ab83c1f..3093321 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java @@ -77,7 +77,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { final OpenFuture f = new DefaultOpenFuture(this); ChannelListener listener = getChannelListenerProxy(); try { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); authSocket = PropertyResolverUtils.getString(this, SshAgent.SSH_AUTHSOCKET_ENV_NAME); pool = Pool.create(AprLibrary.getInstance().getRootPool()); handle = Local.create(authSocket, pool); @@ -119,9 +119,13 @@ public class ChannelAgentForwarding extends AbstractServerChannel { Throwable e = GenericUtils.peelException(t); try { listener.channelOpenFailure(this, e); - } catch (Throwable ignored) { + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}", this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); + if (log.isDebugEnabled()) { + log.debug("doInit(" + this + ") inform listener open failure details", ignored); + } } f.setException(e); } @@ -173,12 +177,6 @@ public class ChannelAgentForwarding extends AbstractServerChannel { } @Override - public void handleEof() throws IOException { - super.handleEof(); -// close(true); - } - - @Override protected void doWriteData(byte[] data, int off, int len) throws IOException { int result = Socket.send(handle, data, off, len); if (result < Status.APR_SUCCESS) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 9ae646f..5673280 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -320,9 +320,13 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C Throwable e = GenericUtils.peelException(t); try { listener.channelOpenFailure(this, e); - } catch (Throwable ignored) { + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); log.warn("handleOpenSuccess({}) failed ({}) to inform listener of open failure={}: {}", this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); + if (log.isDebugEnabled()) { + log.debug("handleOpenSuccess(" + this + ") inform listener open failure details", ignored); + } } this.openFuture.setException(e); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java index 945d431..81f733b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java @@ -96,7 +96,7 @@ public class ChannelDirectTcpip extends AbstractClientChannel { asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA); asyncOut = new ChannelAsyncInputStream(this); } else { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); invertedIn = out; ChannelPipedInputStream pis = new ChannelPipedInputStream(this, localWindow); pipe = new ChannelPipedOutputStream(pis); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index 91fe1e3..aa50f3e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -52,7 +52,7 @@ public class ChannelSession extends AbstractClientChannel { @Override protected void doOpen() throws IOException { - if (streaming == Streaming.Async) { + if (Streaming.Async.equals(streaming)) { asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) { @SuppressWarnings("synthetic-access") @Override @@ -69,7 +69,7 @@ public class ChannelSession extends AbstractClientChannel { asyncOut = new ChannelAsyncInputStream(this); asyncErr = new ChannelAsyncInputStream(this); } else { - invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); if (out == null) { ChannelPipedInputStream pis = new ChannelPipedInputStream(this, localWindow); ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index 51eae39..62caf9f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -77,7 +77,9 @@ public abstract class AbstractChannel protected final Window localWindow; protected final Window remoteWindow; protected ConnectionService service; - protected final AtomicBoolean eof = new AtomicBoolean(false); + protected final AtomicBoolean initialized = new AtomicBoolean(false); + protected final AtomicBoolean eofReceived = new AtomicBoolean(false); + protected final AtomicBoolean eofSent = new AtomicBoolean(false); protected AtomicReference<GracefulState> gracefulState = new AtomicReference<GracefulState>(GracefulState.Opened); protected final DefaultCloseFuture gracefulFuture = new DefaultCloseFuture(lock); protected final List<RequestHandler<Channel>> handlers = new ArrayList<RequestHandler<Channel>>(); @@ -327,11 +329,17 @@ public abstract class AbstractChannel listener.channelInitialized(this); } catch (RuntimeException t) { Throwable e = GenericUtils.peelException(t); - throw new IOException("Failed (" + e.getClass().getSimpleName() + ") to notify channel " + toString() + " initialization: " + e.getMessage(), e); + throw new IOException("Failed (" + e.getClass().getSimpleName() + ") to notify channel " + this + " initialization: " + e.getMessage(), e); } // delegate the rest of the notifications to the channel addChannelListener(listener); configureWindow(); + initialized.set(true); + } + + @Override + public boolean isInitialized() { + return initialized.get(); } protected void notifyStateChanged() { @@ -373,8 +381,15 @@ public abstract class AbstractChannel @Override public void handleClose() throws IOException { if (log.isDebugEnabled()) { - log.debug("handleClose({}) SSH_MSG_CHANNEL_CLOSE on channel", this); + log.debug("handleClose({}) SSH_MSG_CHANNEL_CLOSE", this); + } + + if (!eofSent.getAndSet(true)) { + if (log.isDebugEnabled()) { + log.debug("handleClose({}) prevent sending EOF", this); + } } + if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseReceived)) { close(false); } else if (gracefulState.compareAndSet(GracefulState.CloseSent, GracefulState.Closed)) { @@ -383,12 +398,22 @@ public abstract class AbstractChannel } @Override + public CloseFuture close(boolean immediately) { + if (!eofSent.getAndSet(true)) { + if (log.isDebugEnabled()) { + log.debug("close({}) prevent sending EOF", this); + } + } + + return super.close(immediately); + } + + @Override protected Closeable getInnerCloseable() { return new GracefulChannelCloseable(); } public class GracefulChannelCloseable extends IoBaseCloseable { - private final AtomicBoolean closing = new AtomicBoolean(false); public GracefulChannelCloseable() { @@ -413,13 +438,17 @@ public abstract class AbstractChannel public CloseFuture close(final boolean immediately) { final Channel channel = AbstractChannel.this; if (log.isDebugEnabled()) { - log.debug("close({})[immediately={}] SSH_MSG_CHANNEL_CLOSE on channel", channel, immediately); + log.debug("close({})[immediately={}] processing", channel, immediately); } setClosing(true); if (immediately) { gracefulFuture.setClosed(); } else if (!gracefulFuture.isClosed()) { + if (log.isDebugEnabled()) { + log.debug("close({})[immediately={}] send SSH_MSG_CHANNEL_CLOSE", channel, immediately); + } + Session s = getSession(); Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_CLOSE, Short.SIZE); buffer.putInt(getRecipient()); @@ -452,6 +481,10 @@ public abstract class AbstractChannel log.debug("close({})[immediately={}] {} while writing SSH_MSG_CHANNEL_CLOSE packet on channel: {}", channel, immediately, e.getClass().getSimpleName(), e.getMessage()); } + + if (log.isTraceEnabled()) { + log.trace("close(" + channel + ")[immediately=" + immediately + "] packet write failure details", e); + } channel.close(true); } } @@ -473,10 +506,13 @@ public abstract class AbstractChannel protected void preClose() { ChannelListener listener = getChannelListenerProxy(); try { - listener.channelClosed(this); + listener.channelClosed(this, null); } catch (RuntimeException t) { Throwable e = GenericUtils.peelException(t); - log.warn(e.getClass().getSimpleName() + " while signal channel " + toString() + " closed: " + e.getMessage(), e); + log.warn("preClose({}) {} while signal channel closed: {}", this, e.getClass().getSimpleName(), e.getMessage()); + if (log.isDebugEnabled()) { + log.debug("preClose(" + this + ") channel closed signalling failure details", e); + } } finally { // clear the listeners since we are closing the channel (quicker GC) this.channelListeners.clear(); @@ -531,6 +567,10 @@ public abstract class AbstractChannel if (log.isTraceEnabled()) { log.trace("handleData({}) data: {}", this, BufferUtils.printHex(buffer.array(), buffer.rpos(), len)); } + if (isEofSignalled()) { + // TODO consider throwing an exception + log.warn("handleData({}) extra {} bytes sent after EOF", this, len); + } doWriteData(buffer.array(), buffer.rpos(), len); } @@ -558,26 +598,30 @@ public abstract class AbstractChannel if (log.isTraceEnabled()) { log.trace("handleExtendedData({}) extended data: {}", this, BufferUtils.printHex(buffer.array(), buffer.rpos(), len)); } + if (isEofSignalled()) { + // TODO consider throwing an exception + log.warn("handleExtendedData({}) extra {} bytes sent after EOF", this, len); + } doWriteExtendedData(buffer.array(), buffer.rpos(), len); } - public boolean isEofSignalled() { - return eof.get(); - } - - public void setEofSignalled(boolean on) { - eof.set(on); - } - @Override public void handleEof() throws IOException { - if (log.isDebugEnabled()) { - log.debug("handleEof({}) SSH_MSG_CHANNEL_EOF", this); + if (eofReceived.getAndSet(true)) { + // TODO consider throwing an exception + log.warn("handleEof({}) already signalled", this); + } else { + if (log.isDebugEnabled()) { + log.debug("handleEof({}) SSH_MSG_CHANNEL_EOF", this); + } } - setEofSignalled(true); notifyStateChanged(); } + public boolean isEofSignalled() { + return eofReceived.get(); + } + @Override public void handleWindowAdjust(Buffer buffer) throws IOException { int window = buffer.getInt(); @@ -607,15 +651,34 @@ public abstract class AbstractChannel protected abstract void doWriteExtendedData(byte[] data, int off, int len) throws IOException; protected void sendEof() throws IOException { + if (eofSent.getAndSet(true)) { + if (log.isDebugEnabled()) { + log.debug("sendEof({}) already sent", this); + } + return; + } + + if (isClosing()) { + if (log.isDebugEnabled()) { + log.debug("sendEof({}) already closing or closed", this); + } + return; + } + if (log.isDebugEnabled()) { log.debug("sendEof({}) SSH_MSG_CHANNEL_EOF", this); } + Session s = getSession(); Buffer buffer = s.createBuffer(SshConstants.SSH_MSG_CHANNEL_EOF, Short.SIZE); buffer.putInt(getRecipient()); writePacket(buffer); } + public boolean isEofSent() { + return eofSent.get(); + } + @Override public Map<String, Object> getProperties() { return properties; http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java index 78be9cc..c045cc3 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Channel.java @@ -85,6 +85,12 @@ public interface Channel extends ChannelListenerManager, PropertyResolver, Close void init(ConnectionService service, Session session, int id) throws IOException; /** + * @return {@code true} if call to {@link #init(ConnectionService, Session, int)} was + * successfully completed + */ + boolean isInitialized(); + + /** * For a server channel, this method will actually open the channel * * @param recipient Recipient identifier http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java index 4a4078c..9c1d2d4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelListener.java @@ -69,6 +69,8 @@ public interface ChannelListener extends EventListener { * {@link #channelOpenFailure(Channel, Throwable)} have been called * * @param channel The referenced {@link Channel} + * @param reason The reason why the channel is being closed - if {@code null} + * then normal closure */ - void channelClosed(Channel channel); + void channelClosed(Channel channel, Throwable reason); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java index 27fe429..9525bd2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java @@ -50,6 +50,7 @@ public class ChannelOutputStream extends OutputStream implements Channel { private final long maxWaitTimeout; private final Logger log; private final byte cmd; + private final boolean eofOnClose; private final byte[] b = new byte[1]; private final AtomicBoolean closedState = new AtomicBoolean(false); private Buffer buffer; @@ -57,20 +58,25 @@ public class ChannelOutputStream extends OutputStream implements Channel { private int lastSize; private boolean noDelay; - public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, Logger log, byte cmd) { - this(channel, remoteWindow, PropertyResolverUtils.getLongProperty(channel, WAIT_FOR_SPACE_TIMEOUT, DEFAULT_WAIT_FOR_SPACE_TIMEOUT), log, cmd); + public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, Logger log, byte cmd, boolean eofOnClose) { + this(channel, remoteWindow, PropertyResolverUtils.getLongProperty(channel, WAIT_FOR_SPACE_TIMEOUT, DEFAULT_WAIT_FOR_SPACE_TIMEOUT), log, cmd, eofOnClose); } - public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, long maxWaitTimeout, Logger log, byte cmd) { + public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, long maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) { this.channel = ValidateUtils.checkNotNull(channel, "No channel"); this.remoteWindow = ValidateUtils.checkNotNull(remoteWindow, "No remote window"); ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait time: %d", maxWaitTimeout); this.maxWaitTimeout = maxWaitTimeout; this.log = ValidateUtils.checkNotNull(log, "No logger"); this.cmd = cmd; + this.eofOnClose = eofOnClose; newBuffer(0); } + public boolean isEofOnClose() { + return eofOnClose; + } + public void setNoDelay(boolean noDelay) { this.noDelay = noDelay; } @@ -200,6 +206,9 @@ public class ChannelOutputStream extends OutputStream implements Channel { try { flush(); + if (isEofOnClose()) { + channel.sendEof(); + } } finally { closedState.set(true); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java new file mode 100644 index 0000000..829858a --- /dev/null +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/OpenChannelException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sshd.common.channel; + +/** + * Documents failure of a channel to open as expected. + * + * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> + */ +public class OpenChannelException extends Exception { + private static final long serialVersionUID = 3861183351970782341L; + private final int code; + + public OpenChannelException(int code, String message) { + this(code, message, null); + } + + public OpenChannelException(int code, String message, Throwable cause) { + super(message, cause); + this.code = code; + } + + /** + * The reason code as specified by RFC 4254. + * <ul> + * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_ADMINISTRATIVELY_PROHIBITED} + * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_CONNECT_FAILED} + * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_UNKNOWN_CHANNEL_TYPE} + * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_RESOURCE_SHORTAGE} + * </ul> + * + * @return reason code; 0 if no standardized reason code is given. + */ + public int getReasonCode() { + return code; + } +} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index 44856bd..c531686 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -114,7 +114,7 @@ public class TcpipClientChannel extends AbstractClientChannel { if (streaming == Streaming.Async) { throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); } - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); invertedIn = out; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java index 34f726a..a1e93a0 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java @@ -36,6 +36,8 @@ import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.channel.ChannelListener; +import org.apache.sshd.common.channel.OpenChannelException; import org.apache.sshd.common.channel.RequestHandler; import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.forward.TcpipForwarder; @@ -47,7 +49,6 @@ import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.BufferUtils; import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; -import org.apache.sshd.server.channel.OpenChannelException; import org.apache.sshd.server.x11.X11ForwardSupport; /** @@ -134,7 +135,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten @Override public int registerChannel(Channel channel) throws IOException { - final Session session = getSession(); + Session session = getSession(); int maxChannels = PropertyResolverUtils.getIntProperty(session, MAX_CONCURRENT_CHANNELS_PROP, DEFAULT_MAX_CHANNELS); int curSize = channels.size(); if (curSize > maxChannels) { @@ -143,12 +144,17 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten int channelId = getNextChannelId(); channel.init(this, session, channelId); + + boolean registered = false; synchronized (lock) { - if (isClosing()) { - throw new IllegalStateException("Session is being closed: " + toString()); + if (!isClosing()) { + channels.put(channelId, channel); + registered = true; } + } - channels.put(channelId, channel); + if (!registered) { + handleChannelRegistrationFailure(channel, channelId); } if (log.isDebugEnabled()) { @@ -157,6 +163,23 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten return channelId; } + protected void handleChannelRegistrationFailure(Channel channel, int channelId) throws IOException { + RuntimeException reason = new IllegalStateException("Channel id=" + channelId + " not registered because session is being closed: " + this); + ChannelListener listener = channel.getChannelListenerProxy(); + try { + listener.channelClosed(channel, reason); + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); + log.warn("registerChannel({})[{}] failed ({}) to inform of channel closure: {}", + this, channel, ignored.getClass().getSimpleName(), ignored.getMessage()); + if (log.isDebugEnabled()) { + log.debug("registerChannel(" + this + ")[" + channel + "] inform closure failure details", ignored); + } + } + + throw reason; + } + /** * Remove this channel from the list of managed channels * @@ -365,13 +388,18 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten * @throws IOException if the channel does not exists */ protected Channel getChannel(Buffer buffer) throws IOException { - int recipient = buffer.getInt(); + return getChannel(buffer.getInt(), buffer); + } + + protected Channel getChannel(int recipient, Buffer buffer) throws IOException { Channel channel = channels.get(recipient); if (channel == null) { - buffer.rpos(buffer.rpos() - 5); - int cmd = buffer.getUByte(); + byte[] data = buffer.array(); + int curPos = buffer.rpos(); + int cmd = (curPos >= 5) ? data[curPos - 5] & 0xFF : -1; throw new SshException("Received " + SshConstants.getCommandMessageName(cmd) + " on unknown channel " + recipient); } + return channel; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java index 1e4e140..9765091 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java @@ -85,7 +85,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S Throwable e = GenericUtils.peelException(t); try { listener.channelOpenFailure(this, e); - } catch (Throwable ignored) { + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); log.warn("doInit({}) failed ({}) to inform listener of open failure={}: {}", this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index ca8ba28..1df3dff 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -203,6 +203,10 @@ public class ChannelSession extends AbstractServerChannel { log.debug("handleEof({}) failed ({}) to close receiver: {}", this, e.getClass().getSimpleName(), e.getMessage()); } + + if (log.isTraceEnabled()) { + log.trace("handleEof(" + this + ") receiver close failure details", e); + } } } @@ -515,8 +519,8 @@ public class ChannelSession extends AbstractServerChannel { ((AsyncCommand) command).setIoOutputStream(asyncOut); ((AsyncCommand) command).setIoErrorStream(asyncErr); } else { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); - err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, false); + err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, false); if (log.isTraceEnabled()) { // Wrap in logging filters String channelId = toString(); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java deleted file mode 100644 index 6a606ca..0000000 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/OpenChannelException.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sshd.server.channel; - -/** - * Documents failure of a channel to open as expected. - * - * @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a> - */ -public class OpenChannelException extends Exception { - private static final long serialVersionUID = 3861183351970782341L; - private final int code; - - public OpenChannelException(int code, String message) { - this(code, message, null); - } - - public OpenChannelException(int code, String message, Throwable cause) { - super(message, cause); - this.code = code; - } - - /** - * The reason code as specified by RFC 4254. - * <ul> - * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_ADMINISTRATIVELY_PROHIBITED} - * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_CONNECT_FAILED} - * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_UNKNOWN_CHANNEL_TYPE} - * <li>{@link org.apache.sshd.common.SshConstants#SSH_OPEN_RESOURCE_SHORTAGE} - * </ul> - * - * @return reason code; 0 if no standardized reason code is given. - */ - public int getReasonCode() { - return code; - } -} http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index ced89cb..acd5410 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -32,6 +32,7 @@ import org.apache.sshd.common.channel.Channel; import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.channel.OpenChannelException; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoConnectFuture; @@ -48,7 +49,6 @@ import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; import org.apache.sshd.common.util.threads.ThreadUtils; import org.apache.sshd.server.channel.AbstractServerChannel; -import org.apache.sshd.server.channel.OpenChannelException; /** * TODO Add javadoc @@ -142,7 +142,7 @@ public class TcpipServerChannel extends AbstractServerChannel { } // TODO: revisit for better threading. Use async io ? - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); IoHandler handler = new IoHandler() { @SuppressWarnings("synthetic-access") @Override @@ -202,7 +202,8 @@ public class TcpipServerChannel extends AbstractServerChannel { Throwable e = GenericUtils.peelException(t); try { listener.channelOpenFailure(this, e); - } catch (Throwable ignored) { + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); log.warn("handleChannelConnectResult({})[exception] failed ({}) to inform listener of open failure={}: {}", this, ignored.getClass().getSimpleName(), e.getClass().getSimpleName(), ignored.getMessage()); if (log.isDebugEnabled()) { @@ -225,18 +226,19 @@ public class TcpipServerChannel extends AbstractServerChannel { ChannelListener listener = getChannelListenerProxy(); try { listener.channelOpenFailure(this, problem); - } catch (Throwable ignored) { + } catch (Throwable err) { + Throwable ignored = GenericUtils.peelException(err); log.warn("handleChannelOpenFailure({}) failed ({}) to inform listener of open failure={}: {}", this, ignored.getClass().getSimpleName(), problem.getClass().getSimpleName(), ignored.getMessage()); + if (log.isDebugEnabled()) { + log.debug("handleChannelOpenFailure(" + this + ") listener inform open failure details", ignored); + } } closeImmediately0(); if (problem instanceof ConnectException) { - f.setException(new OpenChannelException( - SshConstants.SSH_OPEN_CONNECT_FAILED, - problem.getMessage(), - problem)); + f.setException(new OpenChannelException(SshConstants.SSH_OPEN_CONNECT_FAILED, problem.getMessage(), problem)); } else { f.setException(problem); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java index 6edc918..b6eebad 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java @@ -222,7 +222,7 @@ public class X11ForwardSupport extends AbstractInnerCloseable implements IoHandl if (streaming == Streaming.Async) { throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); } - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA); + out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); invertedIn = out; } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java index b2148f2..a66aca8 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientAuthenticationManagerTest.java @@ -169,7 +169,7 @@ public class ClientAuthenticationManagerTest extends BaseTestSupport { } @Override - public void channelClosed(Channel channel) { + public void channelClosed(Channel channel, Throwable reason) { // ignored } }); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java index 2978e17..1572978 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java @@ -275,7 +275,7 @@ public class ClientTest extends BaseTestSupport { } @Override - public void channelClosed(Channel channel) { + public void channelClosed(Channel channel, Throwable reason) { updateChannelConfigProperty(channel, "channelClosed"); } @@ -315,6 +315,11 @@ public class ClientTest extends BaseTestSupport { final Logger log = LoggerFactory.getLogger(getClass()); client.addChannelListener(new ChannelListener() { @Override + public void channelInitialized(Channel channel) { + handleChannelEvent("Initialized", channel); + } + + @Override public void channelOpenSuccess(Channel channel) { handleChannelEvent("OpenSuccess", channel); } @@ -330,13 +335,8 @@ public class ClientTest extends BaseTestSupport { } @Override - public void channelInitialized(Channel channel) { - handleChannelEvent("Initialized", channel); - } - - @Override - public void channelClosed(Channel channel) { - handleChannelEvent("Closed", channel); + public void channelClosed(Channel channel, Throwable reason) { + log.info("channelClosed(" + channel + ") reason=" + reason); } private void handleChannelEvent(String name, Channel channel) { @@ -347,7 +347,7 @@ public class ClientTest extends BaseTestSupport { } } - log.info("handleChannelEvent({})[{}]", name, id); + log.info("handleChannelEvent({})[{}] id={}", channel, name, id); throw new ChannelFailureException(name); } }); @@ -359,29 +359,44 @@ public class ClientTest extends BaseTestSupport { InputStream inPipe = new PipedInputStream(pipedIn); ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream err = new ByteArrayOutputStream()) { - // we expect failures either on channel init or open - the one on close is ignored... + // we expect failures either on channel init or open for (int retryCount = 0; retryCount <= 3; retryCount++) { try { + out.reset(); + err.reset(); + try(ChannelShell channel = session.createShellChannel()) { channel.setIn(inPipe); channel.setOut(out); channel.setErr(err); - channel.open().verify(6L, TimeUnit.SECONDS); + channel.open().verify(11L, TimeUnit.SECONDS); + + log.info("Channel established at retry#" + retryCount); + try (OutputStream stdin = channel.getInvertedIn()) { + stdin.write((getCurrentTestName() + "-retry#" + retryCount + "\n").getBytes(StandardCharsets.UTF_8)); + } break; // 1st success means all methods have been invoked } } catch (IOException e) { + outputDebugMessage("%s at retry #%d: %s", e.getClass().getSimpleName(), retryCount, e.getMessage()); synchronized (eventsMap) { eventsMap.remove("Closed"); // since it is called anyway but does not cause an IOException assertTrue("Unexpected failure at retry #" + retryCount, eventsMap.size() < 3); } + } catch(IllegalStateException e) { + // sometimes due to timing issues we get this problem + assertTrue("Premature exception phase - count=" + retryCount, retryCount > 0); + assertTrue("Session not closing", session.isClosing() || session.isClosed()); + log.warn("Session closing prematurely: " + session); + return; } } } finally { client.stop(); } - assertEquals("Mismatched total failures count on test end", 3, eventsMap.size()); - assertEquals("Mismatched open failures count on test end", 1, failuresSet.size()); + assertEquals("Mismatched total failures count on test end", 2, eventsMap.size()); + assertEquals("Mismatched open failures count on test end: " + failuresSet, 1, failuresSet.size()); } @Test @@ -404,7 +419,7 @@ public class ClientTest extends BaseTestSupport { } @Override - public void channelClosed(Channel channel) { + public void channelClosed(Channel channel, Throwable reason) { assertSame("Mismatched closed channel instances", channel, channelHolder.getAndSet(null)); } }); @@ -1476,7 +1491,8 @@ public class ClientTest extends BaseTestSupport { private final String name; public ChannelFailureException(String name) { - this.name = ValidateUtils.checkNotNullAndNotEmpty(name, "No event name provided"); + super(ValidateUtils.checkNotNullAndNotEmpty(name, "No event name provided")); + this.name = name; } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/88c0c819/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java index 153eee9..c3faa17 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/channel/TestChannelListener.java @@ -94,7 +94,7 @@ public class TestChannelListener extends AbstractLoggingBean implements ChannelL } @Override - public void channelClosed(Channel channel) { + public void channelClosed(Channel channel, Throwable reason) { Assert.assertTrue("Unknown closed channel instance: " + channel, activeChannels.remove(channel)); modificationsCounter.release(); log.info("channelClosed({})", channel);
