Author: rgodfrey Date: Fri Dec 12 13:28:28 2014 New Revision: 1644870 URL: http://svn.apache.org/r1644870 Log: Notify engine when transport is blocked
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1644870&r1=1644869&r2=1644870&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Fri Dec 12 13:28:28 2014 @@ -31,7 +31,7 @@ import javax.net.ssl.SSLContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; @@ -50,7 +50,7 @@ public class NonBlockingConnection imple private final Object _lock = new Object(); public NonBlockingConnection(SocketChannel socket, - Receiver<ByteBuffer> delegate, + ServerProtocolEngine delegate, int sendBufferSize, int receiveBufferSize, long timeout, Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1644870&r1=1644869&r2=1644870&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Fri Dec 12 13:28:28 2014 @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; @@ -34,10 +33,9 @@ import javax.net.ssl.SSLContext; import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; @@ -54,7 +52,7 @@ public class NonBlockingNetworkTransport private AcceptingThread _acceptor; protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, - final Receiver<ByteBuffer> engine, + final ServerProtocolEngine engine, final Integer sendBufferSize, final Integer receiveBufferSize, final int timeout, @@ -166,7 +164,8 @@ public class NonBlockingNetworkTransport { socket = _serverSocket.accept(); - final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); if(engine != null) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644870&r1=1644869&r2=1644870&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Fri Dec 12 13:28:28 2014 @@ -41,8 +41,8 @@ import javax.net.ssl.SSLPeerUnverifiedEx import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; @@ -63,7 +63,7 @@ public class NonBlockingSenderReceiver private final Thread _ioThread; private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); - private final Receiver<ByteBuffer> _receiver; + private final ServerProtocolEngine _receiver; private final int _receiveBufSize; private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; @@ -79,7 +79,7 @@ public class NonBlockingSenderReceiver public NonBlockingSenderReceiver(final SocketChannel socketChannel, - Receiver<ByteBuffer> receiver, + ServerProtocolEngine receiver, int receiveBufSize, Ticker ticker, final Set<TransportEncryption> encryptionSet, @@ -202,9 +202,10 @@ public class NonBlockingSenderReceiver LOGGER.debug("Number Ready " + numberReady); - doWrite(); + _receiver.setTransportBlockedForWriting(!doWrite()); doRead(); boolean fullyWritten = doWrite(); + _receiver.setTransportBlockedForWriting(!fullyWritten); _socketChannel.register(_selector, fullyWritten --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org