Repository: mina-sshd Updated Branches: refs/heads/master 89768c3ee -> c644ad82d
[SSHD-629] Use consistent max. packet size configuration throughout the code Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/c644ad82 Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/c644ad82 Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/c644ad82 Branch: refs/heads/master Commit: c644ad82d579773e1483b78b5052dceafd0868b6 Parents: 89768c3 Author: Lyor Goldstein <[email protected]> Authored: Wed Jan 27 14:30:54 2016 +0200 Committer: Lyor Goldstein <[email protected]> Committed: Wed Jan 27 14:30:54 2016 +0200 ---------------------------------------------------------------------- .../sshd/agent/local/AgentForwardedChannel.java | 7 ++- .../agent/local/ChannelAgentForwarding.java | 2 +- .../sshd/agent/unix/AgentForwardedChannel.java | 7 ++- .../sshd/agent/unix/ChannelAgentForwarding.java | 2 +- .../client/channel/AbstractClientChannel.java | 18 +++++-- .../sshd/client/channel/ChannelDirectTcpip.java | 14 ++++-- .../sshd/client/channel/ChannelSession.java | 12 +++-- .../sshd/common/channel/AbstractChannel.java | 51 ++++++++++++++------ .../sshd/common/forward/TcpipClientChannel.java | 11 +++-- .../server/channel/AbstractServerChannel.java | 4 +- .../sshd/server/channel/ChannelSession.java | 13 +++-- .../sshd/server/forward/TcpipServerChannel.java | 7 +-- .../sshd/server/x11/X11ForwardSupport.java | 11 +++-- .../sshd/server/channel/ChannelSessionTest.java | 4 +- 14 files changed, 111 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java index 377798e..29d70f2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java @@ -30,6 +30,7 @@ import org.apache.sshd.agent.common.AbstractAgentProxy; import org.apache.sshd.client.channel.AbstractClientChannel; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; @@ -73,7 +74,9 @@ public class AgentForwardedChannel extends AbstractClientChannel { OutputStream outputStream = getInvertedIn(); outputStream.write(buffer.array(), buffer.rpos(), buffer.available()); outputStream.flush(); - localWindow.consumeAndCheck(buffer.available()); + + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(buffer.available()); if (messages.isEmpty()) { messages.wait(); } @@ -87,7 +90,7 @@ public class AgentForwardedChannel extends AbstractClientChannel { @Override protected void doOpen() throws IOException { ValidateUtils.checkTrue(!Streaming.Async.equals(streaming), "Asynchronous streaming isn't supported yet on this channel"); - invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + invertedIn = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java index 2aefbdc..9a2309c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java @@ -56,7 +56,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { final OpenFuture f = new DefaultOpenFuture(this); ChannelListener listener = getChannelListenerProxy(); try { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); Session session = getSession(); FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager"); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java index ecd3283..f2897c2 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import org.apache.sshd.client.channel.AbstractClientChannel; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.util.ValidateUtils; import org.apache.tomcat.jni.Socket; import org.apache.tomcat.jni.Status; @@ -63,7 +64,7 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn @Override protected synchronized void doOpen() throws IOException { ValidateUtils.checkTrue(!Streaming.Async.equals(streaming), "Asynchronous streaming isn't supported yet on this channel"); - invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + invertedIn = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); } @Override @@ -74,7 +75,9 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn @Override protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException { - localWindow.consumeAndCheck(len); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); + int result = Socket.send(socket, data, off, len); if (result < Status.APR_SUCCESS) { AgentServerProxy.throwException(result); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java index 06e2c92..f76954b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java +++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java @@ -77,7 +77,7 @@ public class ChannelAgentForwarding extends AbstractServerChannel { final OpenFuture f = new DefaultOpenFuture(this); ChannelListener listener = getChannelListenerProxy(); try { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); authSocket = PropertyResolverUtils.getString(this, SshAgent.SSH_AUTHSOCKET_ENV_NAME); pool = Pool.create(AprLibrary.getInstance().getRootPool()); handle = Local.create(authSocket, pool); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index e14b318..f3eb272 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -40,6 +40,7 @@ import org.apache.sshd.common.channel.AbstractChannel; import org.apache.sshd.common.channel.ChannelAsyncInputStream; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelListener; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.io.IoInputStream; import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.session.Session; @@ -288,11 +289,12 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } Session session = getSession(); + Window wLocal = getLocalWindow(); Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, type.length() + Integer.SIZE); buffer.putString(type); buffer.putInt(getId()); - buffer.putInt(localWindow.getSize()); - buffer.putInt(localWindow.getPacketSize()); + buffer.putInt(wLocal.getSize()); + buffer.putInt(wLocal.getPacketSize()); writePacket(buffer); return openFuture; } @@ -308,7 +310,9 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C Session session = getSession(); FactoryManager manager = ValidateUtils.checkNotNull(session.getFactoryManager(), "No factory manager"); - this.remoteWindow.init(rwSize, packetSize, manager.getProperties()); + Window wRemote = getRemoteWindow(); + wRemote.init(rwSize, packetSize, manager.getProperties()); + ChannelListener listener = getChannelListenerProxy(); try { doOpen(); @@ -377,8 +381,10 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } else if (out != null) { out.write(data, off, len); out.flush(); + if (invertedOut == null) { - localWindow.consumeAndCheck(len); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); } } else { throw new IllegalStateException("No output stream for channel"); @@ -396,8 +402,10 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C } else if (err != null) { err.write(data, off, len); err.flush(); + if (invertedErr == null) { - localWindow.consumeAndCheck(len); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); } } else { throw new IllegalStateException("No error stream for channel"); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java index 81f733b..0813142 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java @@ -31,6 +31,7 @@ import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.ChannelPipedInputStream; import org.apache.sshd.common.channel.ChannelPipedOutputStream; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.net.SshdSocketAddress; @@ -76,12 +77,13 @@ public class ChannelDirectTcpip extends AbstractClientChannel { Session session = getSession(); String remoteName = remote.getHostName(); String localName = local.getHostName(); + Window wLocal = getLocalWindow(); Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, type.length() + remoteName.length() + localName.length() + Long.SIZE); buffer.putString(type); buffer.putInt(getId()); - buffer.putInt(localWindow.getSize()); - buffer.putInt(localWindow.getPacketSize()); + buffer.putInt(wLocal.getSize()); + buffer.putInt(wLocal.getPacketSize()); buffer.putString(remoteName); buffer.putInt(remote.getPort()); buffer.putString(localName); @@ -96,9 +98,9 @@ public class ChannelDirectTcpip extends AbstractClientChannel { asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA); asyncOut = new ChannelAsyncInputStream(this); } else { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); invertedIn = out; - ChannelPipedInputStream pis = new ChannelPipedInputStream(this, localWindow); + ChannelPipedInputStream pis = new ChannelPipedInputStream(this, getLocalWindow()); pipe = new ChannelPipedOutputStream(pis); in = pis; invertedOut = in; @@ -109,7 +111,9 @@ public class ChannelDirectTcpip extends AbstractClientChannel { protected void doWriteData(byte[] data, int off, int len) throws IOException { pipe.write(data, off, len); pipe.flush(); - localWindow.consumeAndCheck(len); + + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); } } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java index 5d951cc..e59f7b3 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java @@ -30,6 +30,7 @@ import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.ChannelPipedInputStream; import org.apache.sshd.common.channel.ChannelPipedOutputStream; import org.apache.sshd.common.channel.RequestHandler; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.buffer.Buffer; @@ -69,15 +70,17 @@ public class ChannelSession extends AbstractClientChannel { asyncOut = new ChannelAsyncInputStream(this); asyncErr = new ChannelAsyncInputStream(this); } else { - invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + invertedIn = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + + Window wLocal = getLocalWindow(); if (out == null) { - ChannelPipedInputStream pis = new ChannelPipedInputStream(this, localWindow); + ChannelPipedInputStream pis = new ChannelPipedInputStream(this, wLocal); ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis); out = pos; invertedOut = pis; } if (err == null) { - ChannelPipedInputStream pis = new ChannelPipedInputStream(this, localWindow); + ChannelPipedInputStream pis = new ChannelPipedInputStream(this, wLocal); ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis); err = pos; invertedErr = pis; @@ -159,7 +162,8 @@ public class ChannelSession extends AbstractClientChannel { protected void pumpInputStream() { try { Session session = getSession(); - byte[] buffer = new byte[remoteWindow.getPacketSize()]; + Window wRemote = getRemoteWindow(); + byte[] buffer = new byte[wRemote.getPacketSize()]; while (!closeFuture.isClosed()) { int len = securedRead(in, buffer, 0, buffer.length); if (len < 0) { http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index a958d72..0620ed8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -48,7 +48,6 @@ import org.apache.sshd.common.util.Int2IntFunction; import org.apache.sshd.common.util.ValidateUtils; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.BufferUtils; -import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.common.util.closeable.AbstractInnerCloseable; import org.apache.sshd.common.util.closeable.IoBaseCloseable; import org.apache.sshd.common.util.io.IoUtils; @@ -74,8 +73,6 @@ public abstract class AbstractChannel protected ExecutorService executor; protected boolean shutdownExecutor; - protected final Window localWindow; - protected final Window remoteWindow; protected ConnectionService service; protected final AtomicBoolean initialized = new AtomicBoolean(false); protected final AtomicBoolean eofReceived = new AtomicBoolean(false); @@ -92,6 +89,9 @@ public abstract class AbstractChannel private int id = -1; private int recipient = -1; private Session session; + + private final Window localWindow; + private final Window remoteWindow; /** * A {@link Map} of sent requests - key = request name, value = timestamp when * request was sent. @@ -532,7 +532,7 @@ public abstract class AbstractChannel this.channelListeners.clear(); } - IOException err = IoUtils.closeQuietly(localWindow, remoteWindow); + IOException err = IoUtils.closeQuietly(getLocalWindow(), getRemoteWindow()); if (err != null) { if (log.isDebugEnabled()) { log.debug("Failed (" + err.getClass().getSimpleName() + ") to pre-close window(s) of " + this + ": " + err.getMessage()); @@ -571,15 +571,12 @@ public abstract class AbstractChannel @Override public void handleData(Buffer buffer) throws IOException { - int len = buffer.getInt(); - if (len < 0 || len > ByteArrayBuffer.MAX_LEN) { - throw new IllegalStateException("Bad item length: " + len); - } + int len = validateIncomingDataSize(SshConstants.SSH_MSG_CHANNEL_DATA, buffer.getInt()); if (log.isDebugEnabled()) { log.debug("handleData({}) SSH_MSG_CHANNEL_DATA len={}", this, len); } if (log.isTraceEnabled()) { - log.trace("handleData({}) data: {}", this, BufferUtils.printHex(buffer.array(), buffer.rpos(), len)); + log.trace("handleData({}) data: {} ...", this, BufferUtils.printHex(buffer.array(), buffer.rpos(), Math.min(len, Byte.MAX_VALUE))); } if (isEofSignalled()) { // TODO consider throwing an exception @@ -602,10 +599,8 @@ public abstract class AbstractChannel writePacket(buffer); return; } - int len = buffer.getInt(); - if ((len < 0) || (len > ByteArrayBuffer.MAX_LEN)) { - throw new IllegalStateException("Bad item length: " + len); - } + + int len = validateIncomingDataSize(SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, buffer.getInt()); if (log.isDebugEnabled()) { log.debug("handleExtendedData({}) SSH_MSG_CHANNEL_EXTENDED_DATA len={}", this, len); } @@ -620,6 +615,32 @@ public abstract class AbstractChannel doWriteExtendedData(buffer.array(), buffer.rpos(), len); } + protected int validateIncomingDataSize(int cmd, int len) { + /* + * According to RFC 4254 section 5.1 + * + * The 'maximum packet size' specifies the maximum size of an + * individual data packet that can be sent to the sender + * + * The local window reflects our preference - i.e., how much our peer + * should send at most + */ + Window wLocal = getLocalWindow(); + int maxLocalSize = wLocal.getPacketSize(); + + /* + * The reason for the +4 is that there seems to be some confusion whether + * the max. packet size includes the length field or not + */ + if ((len < 0) || (len > (maxLocalSize + 4))) { + throw new IllegalStateException("Bad length (" + len + ") " + + " for cmd=" + SshConstants.getCommandMessageName(cmd) + + " - max. allowed=" + maxLocalSize); + } + + return len; + } + @Override public void handleEof() throws IOException { if (eofReceived.getAndSet(true)) { @@ -643,7 +664,9 @@ public abstract class AbstractChannel if (log.isDebugEnabled()) { log.debug("handleWindowAdjust({}) SSH_MSG_CHANNEL_WINDOW_ADJUST window={}", this, window); } - remoteWindow.expand(window); + + Window wRemote = getRemoteWindow(); + wRemote.expand(window); } @Override http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index c531686..94c75a3 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -29,6 +29,7 @@ import org.apache.sshd.common.Closeable; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.io.IoSession; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.buffer.Buffer; @@ -95,12 +96,13 @@ public class TcpipClientChannel extends AbstractClientChannel { String srcHost = srcAddress.getHostAddress(); InetAddress dstAddress = dst.getAddress(); String dstHost = dstAddress.getHostAddress(); + Window wLocal = getLocalWindow(); Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, type.length() + srcHost.length() + dstHost.length() + Long.SIZE); buffer.putString(type); buffer.putInt(getId()); - buffer.putInt(localWindow.getSize()); - buffer.putInt(localWindow.getPacketSize()); + buffer.putInt(wLocal.getSize()); + buffer.putInt(wLocal.getPacketSize()); buffer.putString(dstHost); buffer.putInt(dst.getPort()); buffer.putString(srcHost); @@ -114,7 +116,7 @@ public class TcpipClientChannel extends AbstractClientChannel { if (streaming == Streaming.Async) { throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); } - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); invertedIn = out; } @@ -127,7 +129,8 @@ public class TcpipClientChannel extends AbstractClientChannel { protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException { // Make sure we copy the data as the incoming buffer may be reused Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len); - localWindow.consumeAndCheck(len); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); serverSession.write(buf); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java index 8e2f5df..3edb2b5 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AbstractServerChannel.java @@ -26,6 +26,7 @@ import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.channel.AbstractChannel; import org.apache.sshd.common.channel.ChannelListener; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.util.GenericUtils; import org.apache.sshd.common.util.ValidateUtils; @@ -60,7 +61,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S Session s = getSession(); FactoryManager manager = ValidateUtils.checkNotNull(s.getFactoryManager(), "No factory manager"); - this.remoteWindow.init(rwSize, packetSize, manager.getProperties()); + Window wRemote = getRemoteWindow(); + wRemote.init(rwSize, packetSize, manager.getProperties()); configureWindow(); return doInit(buffer); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 5da3993..7363372 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -40,6 +40,7 @@ import org.apache.sshd.common.channel.ChannelAsyncOutputStream; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.PtyMode; import org.apache.sshd.common.channel.RequestHandler; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.file.FileSystemAware; import org.apache.sshd.common.file.FileSystemFactory; import org.apache.sshd.common.future.CloseFuture; @@ -174,7 +175,7 @@ public class ChannelSession extends AbstractServerChannel { } } - IOException e = IoUtils.closeQuietly(remoteWindow, out, err, receiver); + IOException e = IoUtils.closeQuietly(getRemoteWindow(), out, err, receiver); if (e != null) { if (log.isDebugEnabled()) { log.debug("doCloseImmediately({}) failed ({}) to close resources: {}", @@ -220,7 +221,8 @@ public class ChannelSession extends AbstractServerChannel { if (receiver != null) { int r = receiver.data(this, data, off, len); if (r > 0) { - localWindow.consumeAndCheck(r); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(r); } } else { if (tempBuffer == null) { @@ -520,8 +522,9 @@ public class ChannelSession extends AbstractServerChannel { ((AsyncCommand) command).setIoOutputStream(asyncOut); ((AsyncCommand) command).setIoErrorStream(asyncErr); } else { - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, false); - err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, false); + Window wRemote = getRemoteWindow(); + out = new ChannelOutputStream(this, wRemote, log, SshConstants.SSH_MSG_CHANNEL_DATA, false); + err = new ChannelOutputStream(this, wRemote, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA, false); if (log.isTraceEnabled()) { // Wrap in logging filters String channelId = toString(); @@ -539,7 +542,7 @@ public class ChannelSession extends AbstractServerChannel { setDataReceiver(recv); ((AsyncCommand) command).setIoInputStream(recv.getIn()); } else { - PipeDataReceiver recv = new PipeDataReceiver(this, localWindow); + PipeDataReceiver recv = new PipeDataReceiver(this, getLocalWindow()); setDataReceiver(recv); command.setInputStream(recv.getIn()); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java index c9a3920..11ff1b4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java @@ -34,6 +34,7 @@ import org.apache.sshd.common.channel.ChannelFactory; import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.channel.ChannelOutputStream; import org.apache.sshd.common.channel.OpenChannelException; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoConnectFuture; @@ -152,7 +153,7 @@ public class TcpipServerChannel extends AbstractServerChannel { } // TODO: revisit for better threading. Use async io ? - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); IoHandler handler = new IoHandler() { @SuppressWarnings("synthetic-access") @Override @@ -348,11 +349,11 @@ public class TcpipServerChannel extends AbstractServerChannel { // Make sure we copy the data as the incoming buffer may be reused Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len); ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() { - @SuppressWarnings("synthetic-access") @Override public void operationComplete(IoWriteFuture future) { try { - localWindow.consumeAndCheck(len); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); } catch (IOException e) { Session session = getSession(); session.exceptionCaught(e); http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java index 9f90614..8c3f864 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java @@ -34,6 +34,7 @@ import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; import org.apache.sshd.common.channel.ChannelOutputStream; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.io.IoAcceptor; import org.apache.sshd.common.io.IoHandler; import org.apache.sshd.common.io.IoServiceFactory; @@ -205,12 +206,13 @@ public class X11ForwardSupport extends AbstractInnerCloseable implements IoHandl InetAddress remoteAddress = remote.getAddress(); String remoteHost = remoteAddress.getHostAddress(); + Window wLocal = getLocalWindow(); Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, remoteHost.length() + type.length() + Integer.SIZE); buffer.putString(type); buffer.putInt(getId()); - buffer.putInt(localWindow.getSize()); - buffer.putInt(localWindow.getPacketSize()); + buffer.putInt(wLocal.getSize()); + buffer.putInt(wLocal.getPacketSize()); buffer.putString(remoteHost); buffer.putInt(remote.getPort()); writePacket(buffer); @@ -222,7 +224,7 @@ public class X11ForwardSupport extends AbstractInnerCloseable implements IoHandl if (streaming == Streaming.Async) { throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel"); } - out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA, true); + out = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true); invertedIn = out; } @@ -233,7 +235,8 @@ public class X11ForwardSupport extends AbstractInnerCloseable implements IoHandl @Override protected synchronized void doWriteData(byte[] data, int off, int len) throws IOException { - localWindow.consumeAndCheck(len); + Window wLocal = getLocalWindow(); + wLocal.consumeAndCheck(len); // use a clone in case data buffer is re-used serverSession.write(ByteArrayBuffer.getCompactClone(data, off, len)); } http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/c644ad82/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java ---------------------------------------------------------------------- diff --git a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java index 97c5cc3..addfa75 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.channel.ChannelAsyncOutputStream; +import org.apache.sshd.common.channel.Window; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.util.test.BaseTestSupport; @@ -48,7 +49,8 @@ public class ChannelSessionTest extends BaseTestSupport { try (ChannelSession channelSession = new ChannelSession() { { - this.remoteWindow.init(PropertyResolverUtils.toPropertyResolver(Collections.<String,Object>emptyMap())); + Window wRemote = getRemoteWindow(); + wRemote.init(PropertyResolverUtils.toPropertyResolver(Collections.<String,Object>emptyMap())); } }) { final AtomicBoolean expanded = new AtomicBoolean(false);
